postgres/contrib/test_decoding/test_decoding.c

751 lines
21 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* test_decoding.c
* example logical decoding output plugin
*
* Copyright (c) 2012-2020, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/test_decoding/test_decoding.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_type.h"
#include "replication/logical.h"
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
PG_MODULE_MAGIC;
/* These must be available to dlsym() */
extern void _PG_init(void);
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
typedef struct
{
MemoryContext context;
bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
bool only_local;
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init);
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_output_begin(LogicalDecodingContext *ctx,
TestDecodingData *data,
ReorderBufferTXN *txn,
bool last_write);
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
static void pg_decode_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change);
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
static bool pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_output_stream_start(LogicalDecodingContext *ctx,
TestDecodingData *data,
ReorderBufferTXN *txn,
bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change);
void
_PG_init(void)
{
/* other plugins can perform things here */
}
/* specify output plugin callbacks */
void
_PG_output_plugin_init(OutputPluginCallbacks *cb)
{
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
cb->startup_cb = pg_decode_startup;
cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change;
cb->truncate_cb = pg_decode_truncate;
cb->commit_cb = pg_decode_commit_txn;
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
cb->stream_start_cb = pg_decode_stream_start;
cb->stream_stop_cb = pg_decode_stream_stop;
cb->stream_abort_cb = pg_decode_stream_abort;
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
cb->stream_truncate_cb = pg_decode_stream_truncate;
}
/* initialize this plugin */
static void
pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
ListCell *option;
TestDecodingData *data;
Implement streaming mode in ReorderBuffer. Instead of serializing the transaction to disk after reaching the logical_decoding_work_mem limit in memory, we consume the changes we have in memory and invoke stream API methods added by commit 45fdc9738b. However, sometimes if we have incomplete toast or speculative insert we spill to the disk because we can't generate the complete tuple and stream. And, as soon as we get the complete tuple we stream the transaction including the serialized changes. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages at each command end. These features are added by commits 0bead9af48 and c55040ccd0 respectively. Now that we can stream in-progress transactions, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend or WALSender decoding a specific uncommitted transaction. The decoding logic on the receipt of such a sqlerrcode aborts the decoding of the current transaction and continue with the decoding of other transactions. We have ReorderBufferTXN pointer in each ReorderBufferChange by which we know which xact it belongs to. The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). We also provide a new option via SQL APIs to fetch the changes being streamed. Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
2020-08-08 05:04:39 +03:00
bool enable_streaming = false;
data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context,
"text conversion context",
Add macros to make AllocSetContextCreate() calls simpler and safer. I found that half a dozen (nearly 5%) of our AllocSetContextCreate calls had typos in the context-sizing parameters. While none of these led to especially significant problems, they did create minor inefficiencies, and it's now clear that expecting people to copy-and-paste those calls accurately is not a great idea. Let's reduce the risk of future errors by introducing single macros that encapsulate the common use-cases. Three such macros are enough to cover all but two special-purpose contexts; those two calls can be left as-is, I think. While this patch doesn't in itself improve matters for third-party extensions, it doesn't break anything for them either, and they can gradually adopt the simplified notation over time. In passing, change TopMemoryContext to use the default allocation parameters. Formerly it could only be extended 8K at a time. That was probably reasonable when this code was written; but nowadays we create many more contexts than we did then, so that it's not unusual to have a couple hundred K in TopMemoryContext, even without considering various dubious code that sticks other things there. There seems no good reason not to let it use growing blocks like most other contexts. Back-patch to 9.6, mostly because that's still close enough to HEAD that it's easy to do so, and keeping the branches in sync can be expected to avoid some future back-patching pain. The bugs fixed by these changes don't seem to be significant enough to justify fixing them further back. Discussion: <21072.1472321324@sss.pgh.pa.us>
2016-08-28 00:50:38 +03:00
ALLOCSET_DEFAULT_SIZES);
data->include_xids = true;
data->include_timestamp = false;
data->skip_empty_xacts = false;
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
data->only_local = false;
ctx->output_plugin_private = data;
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
opt->receive_rewrites = false;
foreach(option, ctx->output_plugin_options)
{
DefElem *elem = lfirst(option);
Assert(elem->arg == NULL || IsA(elem->arg, String));
if (strcmp(elem->defname, "include-xids") == 0)
{
/* if option does not provide a value, it means its value is true */
if (elem->arg == NULL)
data->include_xids = true;
else if (!parse_bool(strVal(elem->arg), &data->include_xids))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-timestamp") == 0)
{
if (elem->arg == NULL)
data->include_timestamp = true;
else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "force-binary") == 0)
{
bool force_binary;
if (elem->arg == NULL)
continue;
else if (!parse_bool(strVal(elem->arg), &force_binary))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
if (force_binary)
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
}
else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
{
if (elem->arg == NULL)
data->skip_empty_xacts = true;
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
else if (strcmp(elem->defname, "only-local") == 0)
{
if (elem->arg == NULL)
data->only_local = true;
else if (!parse_bool(strVal(elem->arg), &data->only_local))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
}
else if (strcmp(elem->defname, "include-rewrites") == 0)
{
if (elem->arg == NULL)
continue;
else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
Implement streaming mode in ReorderBuffer. Instead of serializing the transaction to disk after reaching the logical_decoding_work_mem limit in memory, we consume the changes we have in memory and invoke stream API methods added by commit 45fdc9738b. However, sometimes if we have incomplete toast or speculative insert we spill to the disk because we can't generate the complete tuple and stream. And, as soon as we get the complete tuple we stream the transaction including the serialized changes. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages at each command end. These features are added by commits 0bead9af48 and c55040ccd0 respectively. Now that we can stream in-progress transactions, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend or WALSender decoding a specific uncommitted transaction. The decoding logic on the receipt of such a sqlerrcode aborts the decoding of the current transaction and continue with the decoding of other transactions. We have ReorderBufferTXN pointer in each ReorderBufferChange by which we know which xact it belongs to. The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). We also provide a new option via SQL APIs to fetch the changes being streamed. Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
2020-08-08 05:04:39 +03:00
else if (strcmp(elem->defname, "stream-changes") == 0)
{
if (elem->arg == NULL)
continue;
else if (!parse_bool(strVal(elem->arg), &enable_streaming))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"%s\" = \"%s\" is unknown",
elem->defname,
elem->arg ? strVal(elem->arg) : "(null)")));
}
}
Implement streaming mode in ReorderBuffer. Instead of serializing the transaction to disk after reaching the logical_decoding_work_mem limit in memory, we consume the changes we have in memory and invoke stream API methods added by commit 45fdc9738b. However, sometimes if we have incomplete toast or speculative insert we spill to the disk because we can't generate the complete tuple and stream. And, as soon as we get the complete tuple we stream the transaction including the serialized changes. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages at each command end. These features are added by commits 0bead9af48 and c55040ccd0 respectively. Now that we can stream in-progress transactions, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend or WALSender decoding a specific uncommitted transaction. The decoding logic on the receipt of such a sqlerrcode aborts the decoding of the current transaction and continue with the decoding of other transactions. We have ReorderBufferTXN pointer in each ReorderBufferChange by which we know which xact it belongs to. The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). We also provide a new option via SQL APIs to fetch the changes being streamed. Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
2020-08-08 05:04:39 +03:00
ctx->streaming &= enable_streaming;
}
/* cleanup this plugin's resources */
static void
pg_decode_shutdown(LogicalDecodingContext *ctx)
{
TestDecodingData *data = ctx->output_plugin_private;
/* cleanup our own resources via memory context reset */
MemoryContextDelete(data->context);
}
/* BEGIN callback */
static void
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
data->xact_wrote_changes = false;
if (data->skip_empty_xacts)
return;
pg_output_begin(ctx, data, txn, true);
}
static void
pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
{
OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
else
appendStringInfoString(ctx->out, "BEGIN");
OutputPluginWrite(ctx, last_write);
}
/* COMMIT callback */
static void
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
else
appendStringInfoString(ctx->out, "COMMIT");
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
timestamptz_to_str(txn->commit_time));
OutputPluginWrite(ctx, true);
}
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 20:30:53 +03:00
static bool
pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
TestDecodingData *data = ctx->output_plugin_private;
if (data->only_local && origin_id != InvalidRepOriginId)
return true;
return false;
}
/*
* Print literal `outputstr' already represented as string of type `typid'
* into stringbuf `s'.
*
* Some builtin types aren't quoted, the rest is quoted. Escaping is done as
* if standard_conforming_strings were enabled.
*/
static void
print_literal(StringInfo s, Oid typid, char *outputstr)
{
const char *valptr;
switch (typid)
{
case INT2OID:
case INT4OID:
case INT8OID:
case OIDOID:
case FLOAT4OID:
case FLOAT8OID:
case NUMERICOID:
/* NB: We don't care about Inf, NaN et al. */
appendStringInfoString(s, outputstr);
break;
case BITOID:
case VARBITOID:
appendStringInfo(s, "B'%s'", outputstr);
break;
case BOOLOID:
if (strcmp(outputstr, "t") == 0)
appendStringInfoString(s, "true");
else
appendStringInfoString(s, "false");
break;
default:
appendStringInfoChar(s, '\'');
for (valptr = outputstr; *valptr; valptr++)
{
char ch = *valptr;
if (SQL_STR_DOUBLE(ch, false))
appendStringInfoChar(s, ch);
appendStringInfoChar(s, ch);
}
appendStringInfoChar(s, '\'');
break;
}
}
/* print the tuple 'tuple' into the StringInfo s */
static void
tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
{
int natt;
/* print all columns individually */
for (natt = 0; natt < tupdesc->natts; natt++)
{
Form_pg_attribute attr; /* the attribute itself */
Oid typid; /* type of current attribute */
Oid typoutput; /* output function */
bool typisvarlena;
Datum origval; /* possibly toasted Datum */
bool isnull; /* column is null? */
attr = TupleDescAttr(tupdesc, natt);
/*
* don't print dropped columns, we can't be sure everything is
* available for them
*/
if (attr->attisdropped)
continue;
/*
* Don't print system columns, oid will already have been printed if
* present.
*/
if (attr->attnum < 0)
continue;
typid = attr->atttypid;
/* get Datum from tuple */
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
if (isnull && skip_nulls)
continue;
/* print attribute name */
appendStringInfoChar(s, ' ');
appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
/* print attribute type */
appendStringInfoChar(s, '[');
appendStringInfoString(s, format_type_be(typid));
appendStringInfoChar(s, ']');
/* query output function */
getTypeOutputInfo(typid,
&typoutput, &typisvarlena);
/* print separator */
appendStringInfoChar(s, ':');
/* print data */
if (isnull)
appendStringInfoString(s, "null");
else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
appendStringInfoString(s, "unchanged-toast-datum");
else if (!typisvarlena)
print_literal(s, typid,
OidOutputFunctionCall(typoutput, origval));
else
{
Datum val; /* definitely detoasted Datum */
val = PointerGetDatum(PG_DETOAST_DATUM(origval));
print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
}
}
}
/*
* callback for individual changed tuples
*/
static void
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
TestDecodingData *data;
Form_pg_class class_form;
TupleDesc tupdesc;
MemoryContext old;
data = ctx->output_plugin_private;
/* output BEGIN if we haven't yet */
if (data->skip_empty_xacts && !data->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "table ");
appendStringInfoString(ctx->out,
quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
class_form->relrewrite ?
get_rel_name(class_form->relrewrite) :
NameStr(class_form->relname)));
appendStringInfoChar(ctx->out, ':');
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
appendStringInfoString(ctx->out, " INSERT:");
if (change->data.tp.newtuple == NULL)
appendStringInfoString(ctx->out, " (no-tuple-data)");
else
tuple_to_stringinfo(ctx->out, tupdesc,
&change->data.tp.newtuple->tuple,
false);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
appendStringInfoString(ctx->out, " UPDATE:");
if (change->data.tp.oldtuple != NULL)
{
appendStringInfoString(ctx->out, " old-key:");
tuple_to_stringinfo(ctx->out, tupdesc,
&change->data.tp.oldtuple->tuple,
true);
appendStringInfoString(ctx->out, " new-tuple:");
}
if (change->data.tp.newtuple == NULL)
appendStringInfoString(ctx->out, " (no-tuple-data)");
else
tuple_to_stringinfo(ctx->out, tupdesc,
&change->data.tp.newtuple->tuple,
false);
break;
case REORDER_BUFFER_CHANGE_DELETE:
appendStringInfoString(ctx->out, " DELETE:");
/* if there was no PK, we only know that a delete happened */
if (change->data.tp.oldtuple == NULL)
appendStringInfoString(ctx->out, " (no-tuple-data)");
/* In DELETE, only the replica identity is present; display that */
else
tuple_to_stringinfo(ctx->out, tupdesc,
&change->data.tp.oldtuple->tuple,
true);
break;
default:
Assert(false);
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
OutputPluginWrite(ctx, true);
}
static void
pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
TestDecodingData *data;
MemoryContext old;
int i;
data = ctx->output_plugin_private;
/* output BEGIN if we haven't yet */
if (data->skip_empty_xacts && !data->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "table ");
for (i = 0; i < nrelations; i++)
{
if (i > 0)
appendStringInfoString(ctx->out, ", ");
appendStringInfoString(ctx->out,
quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
NameStr(relations[i]->rd_rel->relname)));
}
appendStringInfoString(ctx->out, ": TRUNCATE:");
if (change->data.truncate.restart_seqs
|| change->data.truncate.cascade)
{
if (change->data.truncate.restart_seqs)
appendStringInfoString(ctx->out, " restart_seqs");
if (change->data.truncate.cascade)
appendStringInfoString(ctx->out, " cascade");
}
else
appendStringInfoString(ctx->out, " (no-flags)");
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
OutputPluginWrite(ctx, true);
}
static void
pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
const char *prefix, Size sz, const char *message)
{
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
transactional, prefix, sz);
appendBinaryStringInfo(ctx->out, message, sz);
OutputPluginWrite(ctx, true);
}
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
data->xact_wrote_changes = false;
if (data->skip_empty_xacts)
return;
pg_output_stream_start(ctx, data, txn, true);
}
static void
pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
{
OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfoString(ctx->out, "opening a streamed block for transaction");
OutputPluginWrite(ctx, last_write);
}
static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfoString(ctx->out, "closing a streamed block for transaction");
OutputPluginWrite(ctx, true);
}
static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
else
appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
OutputPluginWrite(ctx, true);
}
static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
else
appendStringInfoString(ctx->out, "committing streamed transaction");
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
timestamptz_to_str(txn->commit_time));
OutputPluginWrite(ctx, true);
}
/*
* In streaming mode, we don't display the changes as the transaction can abort
* at a later point in time. We don't want users to see the changes until the
* transaction is committed.
*/
static void
pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
/* output stream start if we haven't yet */
if (data->skip_empty_xacts && !data->xact_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
else
appendStringInfoString(ctx->out, "streaming change for transaction");
OutputPluginWrite(ctx, true);
}
/*
* In streaming mode, we don't display the contents for transactional messages
* as the transaction can abort at a later point in time. We don't want users to
* see the message contents until the transaction is committed.
*/
static void
pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
const char *prefix, Size sz, const char *message)
{
OutputPluginPrepareWrite(ctx, true);
if (transactional)
{
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
transactional, prefix, sz);
}
else
{
appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
transactional, prefix, sz);
appendBinaryStringInfo(ctx->out, message, sz);
}
OutputPluginWrite(ctx, true);
}
/*
* In streaming mode, we don't display the detailed information of Truncate.
* See pg_decode_stream_change.
*/
static void
pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
else
appendStringInfoString(ctx->out, "streaming truncate for transaction");
OutputPluginWrite(ctx, true);
}