Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
*
|
|
|
|
* test_decoding.c
|
|
|
|
* example logical decoding output plugin
|
|
|
|
*
|
2022-01-08 03:04:57 +03:00
|
|
|
* Copyright (c) 2012-2022, PostgreSQL Global Development Group
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* contrib/test_decoding/test_decoding.c
|
|
|
|
*
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
|
|
|
|
#include "catalog/pg_type.h"
|
|
|
|
|
|
|
|
#include "replication/logical.h"
|
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
|
|
|
#include "replication/origin.h"
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
|
|
|
#include "utils/builtins.h"
|
|
|
|
#include "utils/lsyscache.h"
|
|
|
|
#include "utils/memutils.h"
|
|
|
|
#include "utils/rel.h"
|
|
|
|
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
|
|
|
|
typedef struct
|
|
|
|
{
|
|
|
|
MemoryContext context;
|
|
|
|
bool include_xids;
|
|
|
|
bool include_timestamp;
|
2014-09-01 15:42:43 +04:00
|
|
|
bool skip_empty_xacts;
|
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
|
|
|
bool only_local;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
} TestDecodingData;
|
|
|
|
|
2020-11-17 09:44:53 +03:00
|
|
|
/*
|
|
|
|
* Maintain the per-transaction level variables to track whether the
|
|
|
|
* transaction and or streams have written any changes. In streaming mode the
|
|
|
|
* transaction can be decoded in streams so along with maintaining whether the
|
|
|
|
* transaction has written any changes, we also need to track whether the
|
|
|
|
* current stream has written any changes. This is required so that if user
|
|
|
|
* has requested to skip the empty transactions we can skip the empty streams
|
|
|
|
* even though the transaction has written some changes.
|
|
|
|
*/
|
|
|
|
typedef struct
|
|
|
|
{
|
|
|
|
bool xact_wrote_changes;
|
|
|
|
bool stream_wrote_changes;
|
|
|
|
} TestDecodingTxnData;
|
|
|
|
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
|
|
|
bool is_init);
|
|
|
|
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
|
|
|
|
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn);
|
2014-09-01 15:42:43 +04:00
|
|
|
static void pg_output_begin(LogicalDecodingContext *ctx,
|
|
|
|
TestDecodingData *data,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
bool last_write);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
|
|
|
|
static void pg_decode_change(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn, Relation rel,
|
|
|
|
ReorderBufferChange *change);
|
2018-04-07 18:17:56 +03:00
|
|
|
static void pg_decode_truncate(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
int nrelations, Relation relations[],
|
|
|
|
ReorderBufferChange *change);
|
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
|
|
|
static bool pg_decode_filter(LogicalDecodingContext *ctx,
|
|
|
|
RepOriginId origin_id);
|
2016-04-06 12:05:41 +03:00
|
|
|
static void pg_decode_message(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
|
|
|
|
bool transactional, const char *prefix,
|
|
|
|
Size sz, const char *message);
|
2020-12-30 13:47:26 +03:00
|
|
|
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
|
2021-03-30 08:04:43 +03:00
|
|
|
TransactionId xid,
|
2020-12-30 13:47:26 +03:00
|
|
|
const char *gid);
|
|
|
|
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn);
|
|
|
|
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr prepare_lsn);
|
|
|
|
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr commit_lsn);
|
|
|
|
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr prepare_end_lsn,
|
|
|
|
TimestampTz prepare_time);
|
2020-07-28 05:36:44 +03:00
|
|
|
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn);
|
2020-09-11 07:30:01 +03:00
|
|
|
static void pg_output_stream_start(LogicalDecodingContext *ctx,
|
|
|
|
TestDecodingData *data,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
bool last_write);
|
2020-07-28 05:36:44 +03:00
|
|
|
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn);
|
|
|
|
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr abort_lsn);
|
2020-12-30 13:47:26 +03:00
|
|
|
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr prepare_lsn);
|
2020-07-28 05:36:44 +03:00
|
|
|
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr commit_lsn);
|
|
|
|
static void pg_decode_stream_change(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
Relation relation,
|
|
|
|
ReorderBufferChange *change);
|
|
|
|
static void pg_decode_stream_message(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
|
|
|
|
bool transactional, const char *prefix,
|
|
|
|
Size sz, const char *message);
|
|
|
|
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
int nrelations, Relation relations[],
|
|
|
|
ReorderBufferChange *change);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
|
|
|
void
|
|
|
|
_PG_init(void)
|
|
|
|
{
|
|
|
|
/* other plugins can perform things here */
|
|
|
|
}
|
|
|
|
|
|
|
|
/* specify output plugin callbacks */
|
|
|
|
void
|
|
|
|
_PG_output_plugin_init(OutputPluginCallbacks *cb)
|
|
|
|
{
|
|
|
|
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
|
|
|
|
|
|
|
|
cb->startup_cb = pg_decode_startup;
|
|
|
|
cb->begin_cb = pg_decode_begin_txn;
|
|
|
|
cb->change_cb = pg_decode_change;
|
2018-04-07 18:17:56 +03:00
|
|
|
cb->truncate_cb = pg_decode_truncate;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
cb->commit_cb = pg_decode_commit_txn;
|
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
|
|
|
cb->filter_by_origin_cb = pg_decode_filter;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
cb->shutdown_cb = pg_decode_shutdown;
|
2016-04-06 12:05:41 +03:00
|
|
|
cb->message_cb = pg_decode_message;
|
2020-12-30 13:47:26 +03:00
|
|
|
cb->filter_prepare_cb = pg_decode_filter_prepare;
|
|
|
|
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
|
|
|
|
cb->prepare_cb = pg_decode_prepare_txn;
|
|
|
|
cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
|
|
|
|
cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
|
2020-07-28 05:36:44 +03:00
|
|
|
cb->stream_start_cb = pg_decode_stream_start;
|
|
|
|
cb->stream_stop_cb = pg_decode_stream_stop;
|
|
|
|
cb->stream_abort_cb = pg_decode_stream_abort;
|
2020-12-30 13:47:26 +03:00
|
|
|
cb->stream_prepare_cb = pg_decode_stream_prepare;
|
2020-07-28 05:36:44 +03:00
|
|
|
cb->stream_commit_cb = pg_decode_stream_commit;
|
|
|
|
cb->stream_change_cb = pg_decode_stream_change;
|
|
|
|
cb->stream_message_cb = pg_decode_stream_message;
|
|
|
|
cb->stream_truncate_cb = pg_decode_stream_truncate;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/* initialize this plugin */
|
|
|
|
static void
|
|
|
|
pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
|
|
|
|
bool is_init)
|
|
|
|
{
|
|
|
|
ListCell *option;
|
|
|
|
TestDecodingData *data;
|
Implement streaming mode in ReorderBuffer.
Instead of serializing the transaction to disk after reaching the
logical_decoding_work_mem limit in memory, we consume the changes we have
in memory and invoke stream API methods added by commit 45fdc9738b.
However, sometimes if we have incomplete toast or speculative insert we
spill to the disk because we can't generate the complete tuple and stream.
And, as soon as we get the complete tuple we stream the transaction
including the serialized changes.
We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages at each command end. These
features are added by commits 0bead9af48 and c55040ccd0 respectively.
Now that we can stream in-progress transactions, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).
We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such a sqlerrcode aborts the decoding of the current
transaction and continue with the decoding of other transactions.
We have ReorderBufferTXN pointer in each ReorderBufferChange by which we
know which xact it belongs to. The output plugin can use this to decide
which changes to discard in case of stream_abort_cb (e.g. when a subxact
gets discarded).
We also provide a new option via SQL APIs to fetch the changes being
streamed.
Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke
Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
2020-08-08 05:04:39 +03:00
|
|
|
bool enable_streaming = false;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
2014-09-01 15:42:43 +04:00
|
|
|
data = palloc0(sizeof(TestDecodingData));
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
data->context = AllocSetContextCreate(ctx->context,
|
|
|
|
"text conversion context",
|
Add macros to make AllocSetContextCreate() calls simpler and safer.
I found that half a dozen (nearly 5%) of our AllocSetContextCreate calls
had typos in the context-sizing parameters. While none of these led to
especially significant problems, they did create minor inefficiencies,
and it's now clear that expecting people to copy-and-paste those calls
accurately is not a great idea. Let's reduce the risk of future errors
by introducing single macros that encapsulate the common use-cases.
Three such macros are enough to cover all but two special-purpose contexts;
those two calls can be left as-is, I think.
While this patch doesn't in itself improve matters for third-party
extensions, it doesn't break anything for them either, and they can
gradually adopt the simplified notation over time.
In passing, change TopMemoryContext to use the default allocation
parameters. Formerly it could only be extended 8K at a time. That was
probably reasonable when this code was written; but nowadays we create
many more contexts than we did then, so that it's not unusual to have a
couple hundred K in TopMemoryContext, even without considering various
dubious code that sticks other things there. There seems no good reason
not to let it use growing blocks like most other contexts.
Back-patch to 9.6, mostly because that's still close enough to HEAD that
it's easy to do so, and keeping the branches in sync can be expected to
avoid some future back-patching pain. The bugs fixed by these changes
don't seem to be significant enough to justify fixing them further back.
Discussion: <21072.1472321324@sss.pgh.pa.us>
2016-08-28 00:50:38 +03:00
|
|
|
ALLOCSET_DEFAULT_SIZES);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
data->include_xids = true;
|
|
|
|
data->include_timestamp = false;
|
2014-09-01 15:42:43 +04:00
|
|
|
data->skip_empty_xacts = false;
|
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
|
|
|
data->only_local = false;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
|
|
|
ctx->output_plugin_private = data;
|
|
|
|
|
|
|
|
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
|
2018-03-21 16:13:24 +03:00
|
|
|
opt->receive_rewrites = false;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
|
|
|
foreach(option, ctx->output_plugin_options)
|
|
|
|
{
|
|
|
|
DefElem *elem = lfirst(option);
|
|
|
|
|
|
|
|
Assert(elem->arg == NULL || IsA(elem->arg, String));
|
|
|
|
|
|
|
|
if (strcmp(elem->defname, "include-xids") == 0)
|
|
|
|
{
|
|
|
|
/* if option does not provide a value, it means its value is true */
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
data->include_xids = true;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &data->include_xids))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
}
|
|
|
|
else if (strcmp(elem->defname, "include-timestamp") == 0)
|
|
|
|
{
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
data->include_timestamp = true;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
}
|
|
|
|
else if (strcmp(elem->defname, "force-binary") == 0)
|
|
|
|
{
|
|
|
|
bool force_binary;
|
|
|
|
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
continue;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &force_binary))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
|
|
|
|
if (force_binary)
|
|
|
|
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
|
|
|
|
}
|
2014-09-01 15:42:43 +04:00
|
|
|
else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
|
|
|
|
{
|
|
|
|
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
data->skip_empty_xacts = true;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
}
|
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
|
|
|
else if (strcmp(elem->defname, "only-local") == 0)
|
|
|
|
{
|
|
|
|
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
data->only_local = true;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &data->only_local))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
}
|
2018-03-21 16:13:24 +03:00
|
|
|
else if (strcmp(elem->defname, "include-rewrites") == 0)
|
|
|
|
{
|
|
|
|
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
continue;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
}
|
Implement streaming mode in ReorderBuffer.
Instead of serializing the transaction to disk after reaching the
logical_decoding_work_mem limit in memory, we consume the changes we have
in memory and invoke stream API methods added by commit 45fdc9738b.
However, sometimes if we have incomplete toast or speculative insert we
spill to the disk because we can't generate the complete tuple and stream.
And, as soon as we get the complete tuple we stream the transaction
including the serialized changes.
We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages at each command end. These
features are added by commits 0bead9af48 and c55040ccd0 respectively.
Now that we can stream in-progress transactions, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).
We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such a sqlerrcode aborts the decoding of the current
transaction and continue with the decoding of other transactions.
We have ReorderBufferTXN pointer in each ReorderBufferChange by which we
know which xact it belongs to. The output plugin can use this to decide
which changes to discard in case of stream_abort_cb (e.g. when a subxact
gets discarded).
We also provide a new option via SQL APIs to fetch the changes being
streamed.
Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke
Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
2020-08-08 05:04:39 +03:00
|
|
|
else if (strcmp(elem->defname, "stream-changes") == 0)
|
|
|
|
{
|
|
|
|
if (elem->arg == NULL)
|
|
|
|
continue;
|
|
|
|
else if (!parse_bool(strVal(elem->arg), &enable_streaming))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("could not parse value \"%s\" for parameter \"%s\"",
|
|
|
|
strVal(elem->arg), elem->defname)));
|
|
|
|
}
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
else
|
|
|
|
{
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
errmsg("option \"%s\" = \"%s\" is unknown",
|
|
|
|
elem->defname,
|
|
|
|
elem->arg ? strVal(elem->arg) : "(null)")));
|
|
|
|
}
|
|
|
|
}
|
Implement streaming mode in ReorderBuffer.
Instead of serializing the transaction to disk after reaching the
logical_decoding_work_mem limit in memory, we consume the changes we have
in memory and invoke stream API methods added by commit 45fdc9738b.
However, sometimes if we have incomplete toast or speculative insert we
spill to the disk because we can't generate the complete tuple and stream.
And, as soon as we get the complete tuple we stream the transaction
including the serialized changes.
We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages at each command end. These
features are added by commits 0bead9af48 and c55040ccd0 respectively.
Now that we can stream in-progress transactions, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).
We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such a sqlerrcode aborts the decoding of the current
transaction and continue with the decoding of other transactions.
We have ReorderBufferTXN pointer in each ReorderBufferChange by which we
know which xact it belongs to. The output plugin can use this to decide
which changes to discard in case of stream_abort_cb (e.g. when a subxact
gets discarded).
We also provide a new option via SQL APIs to fetch the changes being
streamed.
Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke
Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
2020-08-08 05:04:39 +03:00
|
|
|
|
|
|
|
ctx->streaming &= enable_streaming;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
/* cleanup this plugin's resources */
|
|
|
|
static void
|
|
|
|
pg_decode_shutdown(LogicalDecodingContext *ctx)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
|
|
|
|
/* cleanup our own resources via memory context reset */
|
|
|
|
MemoryContextDelete(data->context);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* BEGIN callback */
|
|
|
|
static void
|
|
|
|
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata =
|
|
|
|
MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
|
|
|
|
|
|
|
|
txndata->xact_wrote_changes = false;
|
|
|
|
txn->output_plugin_private = txndata;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
2022-03-23 18:17:00 +03:00
|
|
|
/*
|
|
|
|
* If asked to skip empty transactions, we'll emit BEGIN at the point
|
|
|
|
* where the first operation is received for this transaction.
|
|
|
|
*/
|
2014-09-01 15:42:43 +04:00
|
|
|
if (data->skip_empty_xacts)
|
|
|
|
return;
|
|
|
|
|
|
|
|
pg_output_begin(ctx, data, txn, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
|
|
|
|
{
|
|
|
|
OutputPluginPrepareWrite(ctx, last_write);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
|
|
|
|
else
|
|
|
|
appendStringInfoString(ctx->out, "BEGIN");
|
2014-09-01 15:42:43 +04:00
|
|
|
OutputPluginWrite(ctx, last_write);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
/* COMMIT callback */
|
|
|
|
static void
|
|
|
|
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr commit_lsn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
|
|
|
bool xact_wrote_changes = txndata->xact_wrote_changes;
|
|
|
|
|
|
|
|
pfree(txndata);
|
|
|
|
txn->output_plugin_private = NULL;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
2020-11-17 09:44:53 +03:00
|
|
|
if (data->skip_empty_xacts && !xact_wrote_changes)
|
2014-09-01 15:42:43 +04:00
|
|
|
return;
|
|
|
|
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
|
|
|
|
else
|
|
|
|
appendStringInfoString(ctx->out, "COMMIT");
|
|
|
|
|
|
|
|
if (data->include_timestamp)
|
|
|
|
appendStringInfo(ctx->out, " (at %s)",
|
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
2021-07-14 05:03:50 +03:00
|
|
|
timestamptz_to_str(txn->xact_time.commit_time));
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
2020-12-30 13:47:26 +03:00
|
|
|
/* BEGIN PREPARE callback */
|
|
|
|
static void
|
|
|
|
pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
TestDecodingTxnData *txndata =
|
|
|
|
MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
|
|
|
|
|
|
|
|
txndata->xact_wrote_changes = false;
|
|
|
|
txn->output_plugin_private = txndata;
|
|
|
|
|
2022-03-23 18:17:00 +03:00
|
|
|
/*
|
|
|
|
* If asked to skip empty transactions, we'll emit BEGIN at the point
|
|
|
|
* where the first operation is received for this transaction.
|
|
|
|
*/
|
2020-12-30 13:47:26 +03:00
|
|
|
if (data->skip_empty_xacts)
|
|
|
|
return;
|
|
|
|
|
|
|
|
pg_output_begin(ctx, data, txn, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* PREPARE callback */
|
|
|
|
static void
|
|
|
|
pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr prepare_lsn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
|
|
|
|
2022-03-23 18:17:00 +03:00
|
|
|
/*
|
|
|
|
* If asked to skip empty transactions, we'll emit PREPARE at the point
|
|
|
|
* where the first operation is received for this transaction.
|
|
|
|
*/
|
2020-12-30 13:47:26 +03:00
|
|
|
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
|
|
|
|
return;
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
|
|
|
|
quote_literal_cstr(txn->gid));
|
|
|
|
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, ", txid %u", txn->xid);
|
|
|
|
|
|
|
|
if (data->include_timestamp)
|
|
|
|
appendStringInfo(ctx->out, " (at %s)",
|
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
2021-07-14 05:03:50 +03:00
|
|
|
timestamptz_to_str(txn->xact_time.prepare_time));
|
2020-12-30 13:47:26 +03:00
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* COMMIT PREPARED callback */
|
|
|
|
static void
|
|
|
|
pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr commit_lsn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
appendStringInfo(ctx->out, "COMMIT PREPARED %s",
|
|
|
|
quote_literal_cstr(txn->gid));
|
|
|
|
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, ", txid %u", txn->xid);
|
|
|
|
|
|
|
|
if (data->include_timestamp)
|
|
|
|
appendStringInfo(ctx->out, " (at %s)",
|
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
2021-07-14 05:03:50 +03:00
|
|
|
timestamptz_to_str(txn->xact_time.commit_time));
|
2020-12-30 13:47:26 +03:00
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* ROLLBACK PREPARED callback */
|
|
|
|
static void
|
|
|
|
pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr prepare_end_lsn,
|
|
|
|
TimestampTz prepare_time)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
|
|
|
|
quote_literal_cstr(txn->gid));
|
|
|
|
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, ", txid %u", txn->xid);
|
|
|
|
|
|
|
|
if (data->include_timestamp)
|
|
|
|
appendStringInfo(ctx->out, " (at %s)",
|
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
2021-07-14 05:03:50 +03:00
|
|
|
timestamptz_to_str(txn->xact_time.commit_time));
|
2020-12-30 13:47:26 +03:00
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Filter out two-phase transactions.
|
|
|
|
*
|
|
|
|
* Each plugin can implement its own filtering logic. Here we demonstrate a
|
|
|
|
* simple logic by checking the GID. If the GID contains the "_nodecode"
|
|
|
|
* substring, then we filter it out.
|
|
|
|
*/
|
|
|
|
static bool
|
2021-03-30 08:04:43 +03:00
|
|
|
pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
|
|
|
|
const char *gid)
|
2020-12-30 13:47:26 +03:00
|
|
|
{
|
|
|
|
if (strstr(gid, "_nodecode") != NULL)
|
|
|
|
return true;
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two
related problems exist:
* How to safely keep track of replication progress
* How to change replication behavior, based on the origin of a row;
e.g. to avoid loops in bi-directional replication setups
The solution to these problems, as implemented here, consist out of
three parts:
1) 'replication origins', which identify nodes in a replication setup.
2) 'replication progress tracking', which remembers, for each
replication origin, how far replay has progressed in a efficient and
crash safe manner.
3) The ability to filter out changes performed on the behest of a
replication origin during logical decoding; this allows complex
replication topologies. E.g. by filtering all replayed changes out.
Most of this could also be implemented in "userspace", e.g. by inserting
additional rows contain origin information, but that ends up being much
less efficient and more complicated. We don't want to require various
replication solutions to reimplement logic for this independently. The
infrastructure is intended to be generic enough to be reusable.
This infrastructure also replaces the 'nodeid' infrastructure of commit
timestamps. It is intended to provide all the former capabilities,
except that there's only 2^16 different origins; but now they integrate
with logical decoding. Additionally more functionality is accessible via
SQL. Since the commit timestamp infrastructure has also been introduced
in 9.5 (commit 73c986add) changing the API is not a problem.
For now the number of origins for which the replication progress can be
tracked simultaneously is determined by the max_replication_slots
GUC. That GUC is not a perfect match to configure this, but there
doesn't seem to be sufficient reason to introduce a separate new one.
Bumps both catversion and wal page magic.
Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer
Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer
Discussion: 20150216002155.GI15326@awork2.anarazel.de,
20140923182422.GA15776@alap3.anarazel.de,
20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
|
|
|
static bool
|
|
|
|
pg_decode_filter(LogicalDecodingContext *ctx,
|
|
|
|
RepOriginId origin_id)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
|
|
|
|
if (data->only_local && origin_id != InvalidRepOriginId)
|
|
|
|
return true;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
/*
|
|
|
|
* Print literal `outputstr' already represented as string of type `typid'
|
|
|
|
* into stringbuf `s'.
|
|
|
|
*
|
|
|
|
* Some builtin types aren't quoted, the rest is quoted. Escaping is done as
|
|
|
|
* if standard_conforming_strings were enabled.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
print_literal(StringInfo s, Oid typid, char *outputstr)
|
|
|
|
{
|
|
|
|
const char *valptr;
|
|
|
|
|
|
|
|
switch (typid)
|
|
|
|
{
|
|
|
|
case INT2OID:
|
|
|
|
case INT4OID:
|
|
|
|
case INT8OID:
|
|
|
|
case OIDOID:
|
|
|
|
case FLOAT4OID:
|
|
|
|
case FLOAT8OID:
|
|
|
|
case NUMERICOID:
|
|
|
|
/* NB: We don't care about Inf, NaN et al. */
|
|
|
|
appendStringInfoString(s, outputstr);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case BITOID:
|
|
|
|
case VARBITOID:
|
|
|
|
appendStringInfo(s, "B'%s'", outputstr);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case BOOLOID:
|
|
|
|
if (strcmp(outputstr, "t") == 0)
|
|
|
|
appendStringInfoString(s, "true");
|
|
|
|
else
|
|
|
|
appendStringInfoString(s, "false");
|
|
|
|
break;
|
|
|
|
|
|
|
|
default:
|
|
|
|
appendStringInfoChar(s, '\'');
|
|
|
|
for (valptr = outputstr; *valptr; valptr++)
|
|
|
|
{
|
|
|
|
char ch = *valptr;
|
|
|
|
|
|
|
|
if (SQL_STR_DOUBLE(ch, false))
|
|
|
|
appendStringInfoChar(s, ch);
|
|
|
|
appendStringInfoChar(s, ch);
|
|
|
|
}
|
|
|
|
appendStringInfoChar(s, '\'');
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* print the tuple 'tuple' into the StringInfo s */
|
|
|
|
static void
|
|
|
|
tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
|
|
|
|
{
|
|
|
|
int natt;
|
|
|
|
|
|
|
|
/* print all columns individually */
|
|
|
|
for (natt = 0; natt < tupdesc->natts; natt++)
|
|
|
|
{
|
|
|
|
Form_pg_attribute attr; /* the attribute itself */
|
|
|
|
Oid typid; /* type of current attribute */
|
|
|
|
Oid typoutput; /* output function */
|
|
|
|
bool typisvarlena;
|
|
|
|
Datum origval; /* possibly toasted Datum */
|
|
|
|
bool isnull; /* column is null? */
|
|
|
|
|
2017-08-20 21:19:07 +03:00
|
|
|
attr = TupleDescAttr(tupdesc, natt);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
|
|
|
/*
|
|
|
|
* don't print dropped columns, we can't be sure everything is
|
|
|
|
* available for them
|
|
|
|
*/
|
|
|
|
if (attr->attisdropped)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Don't print system columns, oid will already have been printed if
|
|
|
|
* present.
|
|
|
|
*/
|
|
|
|
if (attr->attnum < 0)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
typid = attr->atttypid;
|
|
|
|
|
|
|
|
/* get Datum from tuple */
|
2015-06-27 19:49:00 +03:00
|
|
|
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
|
|
|
if (isnull && skip_nulls)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
/* print attribute name */
|
|
|
|
appendStringInfoChar(s, ' ');
|
|
|
|
appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
|
|
|
|
|
|
|
|
/* print attribute type */
|
|
|
|
appendStringInfoChar(s, '[');
|
|
|
|
appendStringInfoString(s, format_type_be(typid));
|
|
|
|
appendStringInfoChar(s, ']');
|
|
|
|
|
|
|
|
/* query output function */
|
|
|
|
getTypeOutputInfo(typid,
|
|
|
|
&typoutput, &typisvarlena);
|
|
|
|
|
|
|
|
/* print separator */
|
|
|
|
appendStringInfoChar(s, ':');
|
|
|
|
|
|
|
|
/* print data */
|
|
|
|
if (isnull)
|
|
|
|
appendStringInfoString(s, "null");
|
|
|
|
else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
|
|
|
|
appendStringInfoString(s, "unchanged-toast-datum");
|
|
|
|
else if (!typisvarlena)
|
|
|
|
print_literal(s, typid,
|
|
|
|
OidOutputFunctionCall(typoutput, origval));
|
|
|
|
else
|
|
|
|
{
|
|
|
|
Datum val; /* definitely detoasted Datum */
|
2014-05-06 20:12:18 +04:00
|
|
|
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
val = PointerGetDatum(PG_DETOAST_DATUM(origval));
|
|
|
|
print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* callback for individual changed tuples
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
|
|
Relation relation, ReorderBufferChange *change)
|
|
|
|
{
|
|
|
|
TestDecodingData *data;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata;
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
Form_pg_class class_form;
|
|
|
|
TupleDesc tupdesc;
|
|
|
|
MemoryContext old;
|
|
|
|
|
|
|
|
data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
txndata = txn->output_plugin_private;
|
2014-09-01 15:42:43 +04:00
|
|
|
|
|
|
|
/* output BEGIN if we haven't yet */
|
2020-11-17 09:44:53 +03:00
|
|
|
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
|
2014-09-01 15:42:43 +04:00
|
|
|
{
|
|
|
|
pg_output_begin(ctx, data, txn, false);
|
|
|
|
}
|
2020-11-17 09:44:53 +03:00
|
|
|
txndata->xact_wrote_changes = true;
|
2014-09-01 15:42:43 +04:00
|
|
|
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
class_form = RelationGetForm(relation);
|
|
|
|
tupdesc = RelationGetDescr(relation);
|
|
|
|
|
|
|
|
/* Avoid leaking memory by using and resetting our own context */
|
|
|
|
old = MemoryContextSwitchTo(data->context);
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
appendStringInfoString(ctx->out, "table ");
|
|
|
|
appendStringInfoString(ctx->out,
|
|
|
|
quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
2018-03-21 16:13:24 +03:00
|
|
|
class_form->relrewrite ?
|
|
|
|
get_rel_name(class_form->relrewrite) :
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
NameStr(class_form->relname)));
|
2015-05-12 03:38:55 +03:00
|
|
|
appendStringInfoChar(ctx->out, ':');
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
|
|
|
|
switch (change->action)
|
|
|
|
{
|
|
|
|
case REORDER_BUFFER_CHANGE_INSERT:
|
|
|
|
appendStringInfoString(ctx->out, " INSERT:");
|
2014-03-08 02:02:48 +04:00
|
|
|
if (change->data.tp.newtuple == NULL)
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
|
|
else
|
|
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
2014-03-08 02:02:48 +04:00
|
|
|
&change->data.tp.newtuple->tuple,
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
false);
|
|
|
|
break;
|
|
|
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
|
|
|
appendStringInfoString(ctx->out, " UPDATE:");
|
2014-03-08 02:02:48 +04:00
|
|
|
if (change->data.tp.oldtuple != NULL)
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
{
|
|
|
|
appendStringInfoString(ctx->out, " old-key:");
|
|
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
2014-03-08 02:02:48 +04:00
|
|
|
&change->data.tp.oldtuple->tuple,
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
true);
|
|
|
|
appendStringInfoString(ctx->out, " new-tuple:");
|
|
|
|
}
|
|
|
|
|
2014-03-08 02:02:48 +04:00
|
|
|
if (change->data.tp.newtuple == NULL)
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
|
|
else
|
|
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
2014-03-08 02:02:48 +04:00
|
|
|
&change->data.tp.newtuple->tuple,
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
false);
|
|
|
|
break;
|
|
|
|
case REORDER_BUFFER_CHANGE_DELETE:
|
|
|
|
appendStringInfoString(ctx->out, " DELETE:");
|
|
|
|
|
|
|
|
/* if there was no PK, we only know that a delete happened */
|
2014-03-08 02:02:48 +04:00
|
|
|
if (change->data.tp.oldtuple == NULL)
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
appendStringInfoString(ctx->out, " (no-tuple-data)");
|
|
|
|
/* In DELETE, only the replica identity is present; display that */
|
|
|
|
else
|
|
|
|
tuple_to_stringinfo(ctx->out, tupdesc,
|
2014-03-08 02:02:48 +04:00
|
|
|
&change->data.tp.oldtuple->tuple,
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
true);
|
|
|
|
break;
|
2014-03-08 02:02:48 +04:00
|
|
|
default:
|
|
|
|
Assert(false);
|
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
2014-03-04 01:32:18 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
MemoryContextSwitchTo(old);
|
|
|
|
MemoryContextReset(data->context);
|
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
2016-04-06 12:05:41 +03:00
|
|
|
|
2018-04-07 18:17:56 +03:00
|
|
|
static void
|
|
|
|
pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
|
|
int nrelations, Relation relations[], ReorderBufferChange *change)
|
|
|
|
{
|
|
|
|
TestDecodingData *data;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata;
|
2018-04-07 18:17:56 +03:00
|
|
|
MemoryContext old;
|
|
|
|
int i;
|
|
|
|
|
|
|
|
data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
txndata = txn->output_plugin_private;
|
2018-04-07 18:17:56 +03:00
|
|
|
|
|
|
|
/* output BEGIN if we haven't yet */
|
2020-11-17 09:44:53 +03:00
|
|
|
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
|
2018-04-07 18:17:56 +03:00
|
|
|
{
|
|
|
|
pg_output_begin(ctx, data, txn, false);
|
|
|
|
}
|
2020-11-17 09:44:53 +03:00
|
|
|
txndata->xact_wrote_changes = true;
|
2018-04-07 18:17:56 +03:00
|
|
|
|
|
|
|
/* Avoid leaking memory by using and resetting our own context */
|
|
|
|
old = MemoryContextSwitchTo(data->context);
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
appendStringInfoString(ctx->out, "table ");
|
|
|
|
|
|
|
|
for (i = 0; i < nrelations; i++)
|
|
|
|
{
|
|
|
|
if (i > 0)
|
|
|
|
appendStringInfoString(ctx->out, ", ");
|
|
|
|
|
|
|
|
appendStringInfoString(ctx->out,
|
|
|
|
quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
|
|
|
|
NameStr(relations[i]->rd_rel->relname)));
|
|
|
|
}
|
|
|
|
|
|
|
|
appendStringInfoString(ctx->out, ": TRUNCATE:");
|
|
|
|
|
|
|
|
if (change->data.truncate.restart_seqs
|
|
|
|
|| change->data.truncate.cascade)
|
|
|
|
{
|
|
|
|
if (change->data.truncate.restart_seqs)
|
2019-07-04 04:01:13 +03:00
|
|
|
appendStringInfoString(ctx->out, " restart_seqs");
|
2018-04-07 18:17:56 +03:00
|
|
|
if (change->data.truncate.cascade)
|
2019-07-04 04:01:13 +03:00
|
|
|
appendStringInfoString(ctx->out, " cascade");
|
2018-04-07 18:17:56 +03:00
|
|
|
}
|
|
|
|
else
|
|
|
|
appendStringInfoString(ctx->out, " (no-flags)");
|
|
|
|
|
|
|
|
MemoryContextSwitchTo(old);
|
|
|
|
MemoryContextReset(data->context);
|
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
2016-04-06 12:05:41 +03:00
|
|
|
static void
|
|
|
|
pg_decode_message(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
|
|
|
const char *prefix, Size sz, const char *message)
|
|
|
|
{
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
|
|
|
|
transactional, prefix, sz);
|
|
|
|
appendBinaryStringInfo(ctx->out, message, sz);
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
2020-07-28 05:36:44 +03:00
|
|
|
|
|
|
|
static void
|
|
|
|
pg_decode_stream_start(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
2020-07-28 05:36:44 +03:00
|
|
|
|
2020-11-17 09:44:53 +03:00
|
|
|
/*
|
|
|
|
* Allocate the txn plugin data for the first stream in the transaction.
|
|
|
|
*/
|
|
|
|
if (txndata == NULL)
|
|
|
|
{
|
|
|
|
txndata =
|
|
|
|
MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
|
|
|
|
txndata->xact_wrote_changes = false;
|
|
|
|
txn->output_plugin_private = txndata;
|
|
|
|
}
|
|
|
|
|
|
|
|
txndata->stream_wrote_changes = false;
|
2020-09-11 07:30:01 +03:00
|
|
|
if (data->skip_empty_xacts)
|
|
|
|
return;
|
|
|
|
pg_output_stream_start(ctx, data, txn, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
|
|
|
|
{
|
|
|
|
OutputPluginPrepareWrite(ctx, last_write);
|
2020-07-28 05:36:44 +03:00
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
|
|
|
|
else
|
2020-10-15 10:35:17 +03:00
|
|
|
appendStringInfoString(ctx->out, "opening a streamed block for transaction");
|
2020-09-11 07:30:01 +03:00
|
|
|
OutputPluginWrite(ctx, last_write);
|
2020-07-28 05:36:44 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
pg_decode_stream_stop(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
2020-07-28 05:36:44 +03:00
|
|
|
|
2020-11-17 09:44:53 +03:00
|
|
|
if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
|
2020-09-11 07:30:01 +03:00
|
|
|
return;
|
|
|
|
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
|
|
|
|
else
|
2020-10-15 10:35:17 +03:00
|
|
|
appendStringInfoString(ctx->out, "closing a streamed block for transaction");
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
pg_decode_stream_abort(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr abort_lsn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
|
2020-11-17 09:44:53 +03:00
|
|
|
/*
|
|
|
|
* stream abort can be sent for an individual subtransaction but we
|
|
|
|
* maintain the output_plugin_private only under the toptxn so if this is
|
|
|
|
* not the toptxn then fetch the toptxn.
|
|
|
|
*/
|
|
|
|
ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
|
|
|
|
TestDecodingTxnData *txndata = toptxn->output_plugin_private;
|
|
|
|
bool xact_wrote_changes = txndata->xact_wrote_changes;
|
|
|
|
|
|
|
|
if (txn->toptxn == NULL)
|
|
|
|
{
|
|
|
|
Assert(txn->output_plugin_private != NULL);
|
|
|
|
pfree(txndata);
|
|
|
|
txn->output_plugin_private = NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (data->skip_empty_xacts && !xact_wrote_changes)
|
2020-09-11 07:30:01 +03:00
|
|
|
return;
|
|
|
|
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
|
|
|
|
else
|
2020-10-15 10:35:17 +03:00
|
|
|
appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
2020-12-30 13:47:26 +03:00
|
|
|
static void
|
|
|
|
pg_decode_stream_prepare(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr prepare_lsn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
|
|
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
|
|
|
|
|
|
|
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
|
|
|
|
return;
|
|
|
|
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
|
|
|
|
quote_literal_cstr(txn->gid), txn->xid);
|
|
|
|
else
|
|
|
|
appendStringInfo(ctx->out, "preparing streamed transaction %s",
|
|
|
|
quote_literal_cstr(txn->gid));
|
|
|
|
|
|
|
|
if (data->include_timestamp)
|
|
|
|
appendStringInfo(ctx->out, " (at %s)",
|
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
2021-07-14 05:03:50 +03:00
|
|
|
timestamptz_to_str(txn->xact_time.prepare_time));
|
2020-12-30 13:47:26 +03:00
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
2020-07-28 05:36:44 +03:00
|
|
|
static void
|
|
|
|
pg_decode_stream_commit(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
XLogRecPtr commit_lsn)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
|
|
|
bool xact_wrote_changes = txndata->xact_wrote_changes;
|
|
|
|
|
|
|
|
pfree(txndata);
|
|
|
|
txn->output_plugin_private = NULL;
|
2020-07-28 05:36:44 +03:00
|
|
|
|
2020-11-17 09:44:53 +03:00
|
|
|
if (data->skip_empty_xacts && !xact_wrote_changes)
|
2020-09-11 07:30:01 +03:00
|
|
|
return;
|
|
|
|
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
|
|
|
|
else
|
2020-10-15 10:35:17 +03:00
|
|
|
appendStringInfoString(ctx->out, "committing streamed transaction");
|
2020-07-28 05:36:44 +03:00
|
|
|
|
|
|
|
if (data->include_timestamp)
|
|
|
|
appendStringInfo(ctx->out, " (at %s)",
|
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:
* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.
* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.
We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.
The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.
We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.
Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
2021-07-14 05:03:50 +03:00
|
|
|
timestamptz_to_str(txn->xact_time.commit_time));
|
2020-07-28 05:36:44 +03:00
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* In streaming mode, we don't display the changes as the transaction can abort
|
|
|
|
* at a later point in time. We don't want users to see the changes until the
|
|
|
|
* transaction is committed.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
pg_decode_stream_change(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn,
|
|
|
|
Relation relation,
|
|
|
|
ReorderBufferChange *change)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
2020-07-28 05:36:44 +03:00
|
|
|
|
2020-09-11 07:30:01 +03:00
|
|
|
/* output stream start if we haven't yet */
|
2020-11-17 09:44:53 +03:00
|
|
|
if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
|
2020-09-11 07:30:01 +03:00
|
|
|
{
|
|
|
|
pg_output_stream_start(ctx, data, txn, false);
|
|
|
|
}
|
2020-11-17 09:44:53 +03:00
|
|
|
txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
|
2020-09-11 07:30:01 +03:00
|
|
|
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
|
|
|
|
else
|
2020-10-15 10:35:17 +03:00
|
|
|
appendStringInfoString(ctx->out, "streaming change for transaction");
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* In streaming mode, we don't display the contents for transactional messages
|
|
|
|
* as the transaction can abort at a later point in time. We don't want users to
|
|
|
|
* see the message contents until the transaction is committed.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
pg_decode_stream_message(LogicalDecodingContext *ctx,
|
|
|
|
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
|
|
|
|
const char *prefix, Size sz, const char *message)
|
|
|
|
{
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
|
|
|
|
if (transactional)
|
|
|
|
{
|
|
|
|
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
|
|
|
|
transactional, prefix, sz);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
|
|
|
|
transactional, prefix, sz);
|
|
|
|
appendBinaryStringInfo(ctx->out, message, sz);
|
|
|
|
}
|
|
|
|
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* In streaming mode, we don't display the detailed information of Truncate.
|
|
|
|
* See pg_decode_stream_change.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
|
|
int nrelations, Relation relations[],
|
|
|
|
ReorderBufferChange *change)
|
|
|
|
{
|
|
|
|
TestDecodingData *data = ctx->output_plugin_private;
|
2020-11-17 09:44:53 +03:00
|
|
|
TestDecodingTxnData *txndata = txn->output_plugin_private;
|
2020-07-28 05:36:44 +03:00
|
|
|
|
2020-11-17 09:44:53 +03:00
|
|
|
if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
|
2020-09-11 07:30:01 +03:00
|
|
|
{
|
|
|
|
pg_output_stream_start(ctx, data, txn, false);
|
|
|
|
}
|
2020-11-17 09:44:53 +03:00
|
|
|
txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
|
2020-09-11 07:30:01 +03:00
|
|
|
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
if (data->include_xids)
|
|
|
|
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
|
|
|
|
else
|
2020-10-15 10:35:17 +03:00
|
|
|
appendStringInfoString(ctx->out, "streaming truncate for transaction");
|
2020-07-28 05:36:44 +03:00
|
|
|
OutputPluginWrite(ctx, true);
|
|
|
|
}
|