diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 4dc5b34d21..2a8cd02664 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7797,6 +7797,16 @@ SCRAM-SHA-256$<iteration count>:&l
+
+
+ subskiplsnpg_lsn
+
+
+ Finish LSN of the transaction whose changes are to be skipped, if a valid
+ LSN; otherwise 0/0.
+
+
+
subconninfotext
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 6431d4796d..555fbd749c 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -362,19 +362,24 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
The LSN of the transaction that contains the change violating the constraint and
the replication origin name can be found from the server log (LSN 0/14C0378 and
- replication origin pg_16395 in the above case). To skip the
- transaction, the subscription needs to be disabled temporarily by
- ALTER SUBSCRIPTION ... DISABLE first or alternatively, the
+ replication origin pg_16395 in the above case). The
+ transaction that produces conflict can be skipped by using
+ ALTER SUBSCRIPTION ... SKIP with the finish LSN
+ (i.e., LSN 0/14C0378). The finish LSN could be an LSN at which the transaction
+ is committed or prepared on the publisher. Alternatively, the transaction can
+ also be skipped by calling the
+ pg_replication_origin_advance() function
+ transaction. Before using this function, the subscription needs to be disabled
+ temporarily either by ALTER SUBSCRIPTION ... DISABLE or, the
subscription can be used with the disable_on_error option.
- Then, the transaction can be skipped by calling the
-
- pg_replication_origin_advance() function with
- the node_name (i.e., pg_16395) and the
- next LSN of the transaction's LSN (i.e., LSN 0/14C0379). After that the replication
- can be resumed by ALTER SUBSCRIPTION ... ENABLE. The current
- position of origins can be seen in the
-
+ Then, you can use pg_replication_origin_advance() function
+ with the node_name (i.e., pg_16395)
+ and the next LSN of the finish LSN (i.e., 0/14C0379). The current position of
+ origins can be seen in the
pg_replication_origin_status system view.
+ Please note that skipping the whole transaction include skipping changes that
+ might not violate any constraint. This can easily make the subscriber
+ inconsistent.
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 58b78a94ea..ac2db249cb 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -29,6 +29,7 @@ ALTER SUBSCRIPTION name REFRESH PUB
ALTER SUBSCRIPTION name ENABLE
ALTER SUBSCRIPTION name DISABLE
ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] )
+ALTER SUBSCRIPTION name SKIP ( skip_option = value )
ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER SUBSCRIPTION name RENAME TO new_name
@@ -210,6 +211,47 @@ ALTER SUBSCRIPTION name RENAME TO <
+
+ SKIP ( skip_option = value )
+
+
+ Skips applying all changes of the remote transaction. If incoming data
+ violates any constraints, logical replication will stop until it is
+ resolved. By using ALTER SUBSCRIPTION ... SKIP command,
+ the logical replication worker skips all data modification changes within
+ the transaction. This option has no effect on the transactions that are
+ already prepared by enabling two_phase on
+ subscriber.
+ After logical replication worker successfully skips the transaction or
+ finishes a transaction, LSN (stored in
+ pg_subscription.subskiplsn)
+ is cleared. See for
+ the details of logical replication conflicts. Using this command requires
+ superuser privilege.
+
+
+
+ skip_option specifies options for this operation.
+ The supported option is:
+
+
+
+ lsn (pg_lsn)
+
+
+ Specifies the finish LSN of the remote transaction whose changes
+ are to be skipped by the logical replication worker. The finish LSN
+ is the LSN at which the transaction is either committed or prepared.
+ Skipping individual subtransaction is not supported. Setting
+ NONE resets the LSN.
+
+
+
+
+
+
+
+
new_owner
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a6304f5f81..0ff0982f7b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->stream = subform->substream;
sub->twophasestate = subform->subtwophasestate;
sub->disableonerr = subform->subdisableonerr;
+ sub->skiplsn = subform->subskiplsn;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index bb1ac30cd1..bd48ee7bd2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,7 +1261,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
- substream, subtwophasestate, subdisableonerr, subslotname,
+ substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname,
subsynccommit, subpublications)
ON pg_subscription TO public;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3922658bbc..e16f04626d 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -45,6 +45,7 @@
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
#include "utils/syscache.h"
/*
@@ -62,6 +63,7 @@
#define SUBOPT_STREAMING 0x00000100
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
#define SUBOPT_DISABLE_ON_ERR 0x00000400
+#define SUBOPT_LSN 0x00000800
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -84,6 +86,7 @@ typedef struct SubOpts
bool streaming;
bool twophase;
bool disableonerr;
+ XLogRecPtr lsn;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
opts->disableonerr = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_LSN) &&
+ strcmp(defel->defname, "lsn") == 0)
+ {
+ char *lsn_str = defGetString(defel);
+ XLogRecPtr lsn;
+
+ if (IsSet(opts->specified_opts, SUBOPT_LSN))
+ errorConflictingDefElem(defel, pstate);
+
+ /* Setting lsn = NONE is treated as resetting LSN */
+ if (strcmp(lsn_str, "none") == 0)
+ lsn = InvalidXLogRecPtr;
+ else
+ {
+ /* Parse the argument as LSN */
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(lsn_str)));
+
+ if (XLogRecPtrIsInvalid(lsn))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid WAL location (LSN): %s", lsn_str)));
+ }
+
+ opts->specified_opts |= SUBOPT_LSN;
+ opts->lsn = lsn;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
LOGICALREP_TWOPHASE_STATE_PENDING :
LOGICALREP_TWOPHASE_STATE_DISABLED);
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+ values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
+ case ALTER_SUBSCRIPTION_SKIP:
+ {
+ parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
+
+ /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
+ Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
+
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to skip transaction")));
+
+ /*
+ * If the user sets subskiplsn, we do a sanity check to make
+ * sure that the specified LSN is a probable value.
+ */
+ if (!XLogRecPtrIsInvalid(opts.lsn))
+ {
+ RepOriginId originid;
+ char originname[NAMEDATALEN];
+ XLogRecPtr remote_lsn;
+
+ snprintf(originname, sizeof(originname), "pg_%u", subid);
+ originid = replorigin_by_name(originname, false);
+ remote_lsn = replorigin_get_progress(originid, false);
+
+ /* Check the given LSN is at least a future LSN */
+ if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
+ LSN_FORMAT_ARGS(opts.lsn),
+ LSN_FORMAT_ARGS(remote_lsn))));
+ }
+
+ values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
+ replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+ update_tuple = true;
+ break;
+ }
+
default:
elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a03b33b53b..0036c2f9e2 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9983,6 +9983,15 @@ AlterSubscriptionStmt:
(Node *)makeBoolean(false), @1));
$$ = (Node *)n;
}
+ | ALTER SUBSCRIPTION name SKIP definition
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_SKIP;
+ n->subname = $3;
+ n->options = $5;
+ $$ = (Node *)n;
+ }
;
/*****************************************************************************
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 03e069c7cd..82dcffc2db 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
+#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
@@ -189,6 +190,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/syscache.h"
@@ -259,6 +261,21 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
+/*
+ * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
+ * the subscription if the remote transaction's finish LSN matches the subskiplsn.
+ * Once we start skipping changes, we don't stop it until we skip all changes of
+ * the transaction even if pg_subscription is updated and MySubscription->skiplsn
+ * gets changed or reset during that. Also, in streaming transaction cases, we
+ * don't skip receiving and spooling the changes since we decide whether or not
+ * to skip applying the changes when starting to apply changes. The subskiplsn is
+ * cleared after successfully skipping the transaction or applying non-empty
+ * transaction. The latter prevents the mistakenly specified subskiplsn from
+ * being left.
+ */
+static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
+#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int
/* Common streaming function to apply all the spooled messages */
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
+/* Functions for skipping changes */
+static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
+static void stop_skipping_changes(void);
+static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
+
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
@@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s)
remote_final_lsn = begin_data.final_lsn;
+ maybe_start_skipping_changes(begin_data.final_lsn);
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s)
remote_final_lsn = begin_data.prepare_lsn;
+ maybe_start_skipping_changes(begin_data.prepare_lsn);
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s)
/*
* Unlike commit, here, we always prepare the transaction even though no
- * change has happened in this transaction. It is done this way because at
- * commit prepared time, we won't know whether we have skipped preparing a
- * transaction because of no change.
+ * change has happened in this transaction or all changes are skipped. It
+ * is done this way because at commit prepared time, we won't know whether
+ * we have skipped preparing a transaction because of those reasons.
*
* XXX, We can optimize such that at commit prepared time, we first check
* whether we have prepared the transaction or not but that doesn't seem
@@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ /*
+ * Since we have already prepared the transaction, in a case where the
+ * server crashes before clearing the subskiplsn, it will be left but the
+ * transaction won't be resent. But that's okay because it's a rare case
+ * and the subskiplsn will be cleared when finishing the next transaction.
+ */
+ stop_skipping_changes();
+ clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
}
@@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ clear_subscription_skip_lsn(prepare_data.end_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
}
@@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s)
FinishPreparedTransaction(gid, false);
end_replication_step();
CommitTransactionCommand();
+
+ clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
}
pgstat_report_stat(false);
@@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s)
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
+ /*
+ * Similar to prepare case, the subskiplsn could be left in a case of
+ * server crash but it's okay. See the comments in apply_handle_prepare().
+ */
+ stop_skipping_changes();
+ clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
pgstat_report_activity(STATE_IDLE, NULL);
reset_apply_error_context_info();
@@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
MemoryContext oldcxt;
BufFile *fd;
+ maybe_start_skipping_changes(lsn);
+
/* Make sure we have an open transaction */
begin_replication_step();
@@ -1455,8 +1503,26 @@ apply_handle_stream_commit(StringInfo s)
static void
apply_handle_commit_internal(LogicalRepCommitData *commit_data)
{
+ if (is_skipping_changes())
+ {
+ stop_skipping_changes();
+
+ /*
+ * Start a new transaction to clear the subskiplsn, if not started
+ * yet.
+ */
+ if (!IsTransactionState())
+ StartTransactionCommand();
+ }
+
if (IsTransactionState())
{
+ /*
+ * The transaction is either non-empty or skipped, so we clear the
+ * subskiplsn.
+ */
+ clear_subscription_skip_lsn(commit_data->commit_lsn);
+
/*
* Update origin state so we can restart streaming from correct
* position in case of crash.
@@ -1583,7 +1649,12 @@ apply_handle_insert(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
return;
begin_replication_step();
@@ -1710,7 +1781,12 @@ apply_handle_update(StringInfo s)
RangeTblEntry *target_rte;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
return;
begin_replication_step();
@@ -1874,7 +1950,12 @@ apply_handle_delete(StringInfo s)
TupleTableSlot *remoteslot;
MemoryContext oldctx;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
return;
begin_replication_step();
@@ -2261,7 +2342,12 @@ apply_handle_truncate(StringInfo s)
ListCell *lc;
LOCKMODE lockmode = AccessExclusiveLock;
- if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
+ /*
+ * Quick return if we are skipping data modification changes or handling
+ * streamed transactions.
+ */
+ if (is_skipping_changes() ||
+ handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
return;
begin_replication_step();
@@ -3738,6 +3824,139 @@ IsLogicalWorker(void)
return MyLogicalRepWorker != NULL;
}
+/*
+ * Start skipping changes of the transaction if the given LSN matches the
+ * LSN specified by subscription's skiplsn.
+ */
+static void
+maybe_start_skipping_changes(XLogRecPtr finish_lsn)
+{
+ Assert(!is_skipping_changes());
+ Assert(!in_remote_transaction);
+ Assert(!in_streamed_transaction);
+
+ /*
+ * Quick return if it's not requested to skip this transaction. This
+ * function is called for every remote transaction and we assume that
+ * skipping the transaction is not used often.
+ */
+ if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
+ MySubscription->skiplsn != finish_lsn))
+ return;
+
+ /* Start skipping all changes of this transaction */
+ skip_xact_finish_lsn = finish_lsn;
+
+ ereport(LOG,
+ errmsg("start skipping logical replication transaction finished at %X/%X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
+}
+
+/*
+ * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
+ */
+static void
+stop_skipping_changes(void)
+{
+ if (!is_skipping_changes())
+ return;
+
+ ereport(LOG,
+ (errmsg("done skipping logical replication transaction finished at %X/%X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+
+ /* Stop skipping changes */
+ skip_xact_finish_lsn = InvalidXLogRecPtr;
+}
+
+/*
+ * Clear subskiplsn of pg_subscription catalog.
+ *
+ * finish_lsn is the transaction's finish LSN that is used to check if the
+ * subskiplsn matches it. If not matched, we raise a warning when clearing the
+ * subskiplsn in order to inform users for cases e.g., where the user mistakenly
+ * specified the wrong subskiplsn.
+ */
+static void
+clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
+{
+ Relation rel;
+ Form_pg_subscription subform;
+ HeapTuple tup;
+ XLogRecPtr myskiplsn = MySubscription->skiplsn;
+ bool started_tx = false;
+
+ if (likely(XLogRecPtrIsInvalid(myskiplsn)))
+ return;
+
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ /*
+ * Protect subskiplsn of pg_subscription from being concurrently updated
+ * while clearing it.
+ */
+ LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+ AccessShareLock);
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+ /* Fetch the existing tuple. */
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+ ObjectIdGetDatum(MySubscription->oid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+ subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+ /*
+ * Clear the subskiplsn. If the user has already changed subskiplsn before
+ * clearing it we don't update the catalog and the replication origin
+ * state won't get advanced. So in the worst case, if the server crashes
+ * before sending an acknowledgment of the flush position the transaction
+ * will be sent again and the user needs to set subskiplsn again. We can
+ * reduce the possibility by logging a replication origin WAL record to
+ * advance the origin LSN instead but there is no way to advance the
+ * origin timestamp and it doesn't seem to be worth doing anything about
+ * it since it's a very rare case.
+ */
+ if (subform->subskiplsn == myskiplsn)
+ {
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* reset subskiplsn */
+ values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+ replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ if (myskiplsn != finish_lsn)
+ ereport(WARNING,
+ errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name),
+ errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
+ LSN_FORMAT_ARGS(finish_lsn),
+ LSN_FORMAT_ARGS(myskiplsn)));
+ }
+
+ heap_freetuple(tup);
+ table_close(rel, NoLock);
+
+ if (started_tx)
+ CommitTransactionCommand();
+}
+
/* Error callback to give more context info about the change being applied */
static void
apply_error_callback(void *arg)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 725cd2e4eb..e5816c4cce 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4385,6 +4385,10 @@ getSubscriptions(Archive *fout)
ntups = PQntuples(res);
+ /*
+ * Get subscription fields. We don't include subskiplsn in the dump as
+ * after restoring the dump this value may no longer be relevant.
+ */
i_tableoid = PQfnumber(res, "tableoid");
i_oid = PQfnumber(res, "oid");
i_subname = PQfnumber(res, "subname");
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 991bfc1546..714097cad1 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6105,7 +6105,7 @@ describeSubscriptions(const char *pattern, bool verbose)
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
- false, false, false, false, false, false};
+ false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -6152,6 +6152,12 @@ describeSubscriptions(const char *pattern, bool verbose)
", subconninfo AS \"%s\"\n",
gettext_noop("Synchronous commit"),
gettext_noop("Conninfo"));
+
+ /* Skip LSN is only supported in v15 and higher */
+ if (pset.sversion >= 150000)
+ appendPQExpBuffer(&buf,
+ ", subskiplsn AS \"%s\"\n",
+ gettext_noop("Skip LSN"));
}
/* Only display subscriptions in current database. */
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 183abcc275..5c064595a9 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1852,7 +1852,7 @@ psql_completion(const char *text, int start, int end)
/* ALTER SUBSCRIPTION */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
- "RENAME TO", "REFRESH PUBLICATION", "SET",
+ "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP (",
"ADD PUBLICATION", "DROP PUBLICATION");
/* ALTER SUBSCRIPTION REFRESH PUBLICATION */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
@@ -1868,6 +1868,9 @@ psql_completion(const char *text, int start, int end)
/* ALTER SUBSCRIPTION SET ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
+ /* ALTER SUBSCRIPTION SKIP ( */
+ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
+ COMPLETE_WITH("lsn");
/* ALTER SUBSCRIPTION SET PUBLICATION */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
{
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 1383761c1f..db9963db72 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202203211
+#define CATALOG_VERSION_NO 202203221
#endif
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index e2befaf351..69969a0617 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -70,6 +70,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subdisableonerr; /* True if a worker error should cause the
* subscription to be disabled */
+ XLogRecPtr subskiplsn; /* All changes finished at this LSN are
+ * skipped */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@@ -109,6 +112,8 @@ typedef struct Subscription
bool disableonerr; /* Indicates if the subscription should be
* automatically disabled if a worker error
* occurs */
+ XLogRecPtr skiplsn; /* All changes finished at this LSN are
+ * skipped */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1617702d9d..6f83a79a96 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3726,7 +3726,8 @@ typedef enum AlterSubscriptionType
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
ALTER_SUBSCRIPTION_DROP_PUBLICATION,
ALTER_SUBSCRIPTION_REFRESH,
- ALTER_SUBSCRIPTION_ENABLED
+ ALTER_SUBSCRIPTION_ENABLED,
+ ALTER_SUBSCRIPTION_SKIP
} AlterSubscriptionType;
typedef struct AlterSubscriptionStmt
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index ad8003fae1..7fcfad1591 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -93,11 +93,25 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2
ERROR: subscription "regress_doesnotexist" does not exist
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
+-- ok
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345
+(1 row)
+
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+ERROR: invalid WAL location (LSN): 0/0
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -129,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -165,19 +179,19 @@ ERROR: binary requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +202,19 @@ ERROR: streaming requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication already exists
@@ -215,10 +229,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication used more then once
@@ -233,10 +247,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +284,10 @@ ERROR: two_phase requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
--fail - alter of two_phase option not supported.
@@ -282,10 +296,10 @@ ERROR: unrecognized subscription parameter: "two_phase"
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +308,10 @@ DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -309,18 +323,18 @@ ERROR: disable_on_error requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index a7c15b1daf..74c38ead5d 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -72,6 +72,17 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2';
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
+-- ok
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
+
+\dRs+
+
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+
\dRs+
BEGIN;
diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl
deleted file mode 100644
index 5eca804446..0000000000
--- a/src/test/subscription/t/029_disable_on_error.pl
+++ /dev/null
@@ -1,94 +0,0 @@
-
-# Copyright (c) 2021-2022, PostgreSQL Global Development Group
-
-# Test of logical replication subscription self-disabling feature.
-use strict;
-use warnings;
-use PostgreSQL::Test::Cluster;
-use PostgreSQL::Test::Utils;
-use Test::More;
-
-# create publisher node
-my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
-$node_publisher->init(allows_streaming => 'logical');
-$node_publisher->start;
-
-# create subscriber node
-my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
-$node_subscriber->init;
-$node_subscriber->start;
-
-# Create identical table on both nodes.
-$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
-$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
-
-# Insert duplicate values on the publisher.
-$node_publisher->safe_psql('postgres',
- "INSERT INTO tbl (i) VALUES (1), (1), (1)");
-
-# Create an additional unique index on the subscriber.
-$node_subscriber->safe_psql('postgres',
- "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
-
-# Create a pub/sub to set up logical replication. This tests that the
-# uniqueness violation will cause the subscription to fail during initial
-# synchronization and make it disabled.
-my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
-$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION pub FOR TABLE tbl");
-$node_subscriber->safe_psql('postgres',
- "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)"
-);
-
-# Initial synchronization failure causes the subscription to be disabled.
-$node_subscriber->poll_query_until('postgres',
- "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
-) or die "Timed out while waiting for subscriber to be disabled";
-
-# Drop the unique index on the subscriber which caused the subscription to be
-# disabled.
-$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
-
-# Re-enable the subscription "sub".
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-
-# Wait for the data to replicate.
-$node_publisher->wait_for_catchup('sub');
-$node_subscriber->poll_query_until('postgres',
- "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
-);
-
-# Confirm that we have finished the table sync.
-my $result =
- $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl");
-is($result, qq(1|3), "subscription sub replicated data");
-
-# Delete the data from the subscriber and recreate the unique index.
-$node_subscriber->safe_psql('postgres', "DELETE FROM tbl");
-$node_subscriber->safe_psql('postgres',
- "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
-
-# Add more non-unique data to the publisher.
-$node_publisher->safe_psql('postgres',
- "INSERT INTO tbl (i) VALUES (3), (3), (3)");
-
-# Apply failure causes the subscription to be disabled.
-$node_subscriber->poll_query_until('postgres',
- "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
-) or die "Timed out while waiting for subscription sub to be disabled";
-
-# Drop the unique index on the subscriber and re-enabled the subscription. Then
-# confirm that the previously failing insert was applied OK.
-$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-
-$node_publisher->wait_for_catchup('sub');
-
-$result = $node_subscriber->safe_psql('postgres',
- "SELECT COUNT(*) FROM tbl WHERE i = 3");
-is($result, qq(3), 'check the result of apply');
-
-$node_subscriber->stop;
-$node_publisher->stop;
-
-done_testing();
diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl
new file mode 100644
index 0000000000..e8b904b745
--- /dev/null
+++ b/src/test/subscription/t/029_on_error.pl
@@ -0,0 +1,183 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Tests for disable_on_error and SKIP transaction features.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $offset = 0;
+
+# Test skipping the transaction. This function must be called after the caller
+# has inserted data that conflicts with the subscriber. The finish LSN of the
+# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is
+# fetched from the server logs. After executing ALTER SUBSCRITPION ... SKIP, we
+# check if logical replication can continue working by inserting $nonconflict_data
+# on the publisher.
+sub test_skip_lsn
+{
+ my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg)
+ = @_;
+
+ # Wait until a conflict occurs on the subscriber.
+ $node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'"
+ );
+
+ # Get the finish LSN of the error transaction.
+ my $contents = slurp_file($node_subscriber->logfile, $offset);
+ $contents =~
+ qr/processing remote data for replication origin \"pg_\d+\" during "INSERT" for replication target relation "public.tbl" in transaction \d+ finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/
+ or die "could not get error-LSN";
+ my $lsn = $1;
+
+ # Set skip lsn.
+ $node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn')");
+
+ # Re-enable the subscription.
+ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+ # Wait for the failed transaction to be skipped
+ $node_subscriber->poll_query_until('postgres',
+ "SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'"
+ );
+
+ # Check the log to ensure that the transaction is skipped, and advance the
+ # offset of the log file for the next test.
+ $offset = $node_subscriber->wait_for_log(
+ qr/LOG: done skipping logical replication transaction finished at $lsn/,
+ $offset);
+
+ # Insert non-conflict data
+ $node_publisher->safe_psql('postgres',
+ "INSERT INTO tbl VALUES $nonconflict_data");
+
+ $node_publisher->wait_for_catchup('sub');
+
+ # Check replicated data
+ my $res =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+ is($res, $expected, $msg);
+}
+
+# Create publisher node. Set a low value of logical_decoding_work_mem to test
+# streaming cases.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+ 'postgresql.conf',
+ qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf(
+ 'postgresql.conf',
+ qq[
+max_prepared_transactions = 10
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On the subscriber, we
+# create the same tables but with a primary key. Also, insert some data that
+# will conflict with the data replicated from publisher later.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+CREATE TABLE tbl (i INT, t TEXT);
+INSERT INTO tbl VALUES (1, NULL);
+]);
+$node_subscriber->safe_psql(
+ 'postgres',
+ qq[
+CREATE TABLE tbl (i INT PRIMARY KEY, t TEXT);
+INSERT INTO tbl VALUES (1, NULL);
+]);
+
+# Create a pub/sub to set up logical replication. This tests that the
+# uniqueness violation will cause the subscription to fail during initial
+# synchronization and make it disabled.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub FOR TABLE tbl");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)"
+);
+
+# Initial synchronization failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscriber to be disabled";
+
+# Truncate the table on the subscriber which caused the subscription to be
+# disabled.
+$node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
+
+# Re-enable the subscription "sub".
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the data to replicate.
+$node_publisher->wait_for_catchup('sub');
+$node_subscriber->poll_query_until('postgres',
+ "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
+);
+
+# Confirm that we have finished the table sync.
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM tbl");
+is($result, qq(1), "subscription sub replicated data");
+
+# Insert data to tbl, raising an error on the subscriber due to violation
+# of the unique constraint on tbl. Then skip the transaction.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+INSERT INTO tbl VALUES (1, NULL);
+COMMIT;
+]);
+test_skip_lsn($node_publisher, $node_subscriber,
+ "(2, NULL)", "2", "test skipping transaction");
+
+# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and
+# PREPARE the transaction, raising an error. Then skip the transaction.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+INSERT INTO tbl VALUES (1, NULL);
+PREPARE TRANSACTION 'gtx';
+COMMIT PREPARED 'gtx';
+]);
+test_skip_lsn($node_publisher, $node_subscriber,
+ "(3, NULL)", "3", "test skipping prepare and commit prepared ");
+
+# Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB
+# limit, also raising an error on the subscriber during applying spooled
+# changes for the same reason. Then skip the transaction.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+INSERT INTO tbl SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+COMMIT;
+]);
+test_skip_lsn($node_publisher, $node_subscriber, "(4, md5(4::text))",
+ "4", "test skipping stream-commit");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT COUNT(*) FROM pg_prepared_xacts");
+is($result, "0",
+ "check all prepared transactions are resolved on the subscriber");
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();