diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 46917f9f94..df62eb45ff 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1582,10 +1582,11 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
- Additional logging is triggered in the following conflict
- cases:
+ Additional logging is triggered, and the conflict statistics are collected (displayed in the
+ pg_stat_subscription_stats view)
+ in the following conflict cases:
-
+ insert_exists
@@ -1598,7 +1599,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
-
+ update_origin_differs
@@ -1610,7 +1611,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
-
+ update_exists
@@ -1627,7 +1628,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
-
+ update_missing
@@ -1636,7 +1637,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
-
+ delete_origin_differs
@@ -1648,7 +1649,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
-
+ delete_missing
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 55417a6fa9..933de6fe07 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -507,7 +507,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
pg_stat_subscription_statspg_stat_subscription_stats
- One row per subscription, showing statistics about errors.
+ One row per subscription, showing statistics about errors and conflicts.
See
pg_stat_subscription_stats for details.
@@ -2157,7 +2157,10 @@ description | Waiting for a newly initialized WAL file to reach durable storage
apply_error_countbigint
- Number of times an error occurred while applying changes
+ Number of times an error occurred while applying changes. Note that any
+ conflict resulting in an apply error will be counted in both
+ apply_error_count and the corresponding conflict
+ count (e.g., confl_*).
@@ -2171,6 +2174,76 @@ description | Waiting for a newly initialized WAL file to reach durable storage
+
+
+ confl_insert_existsbigint
+
+
+ Number of times a row insertion violated a
+ NOT DEFERRABLE unique constraint during the
+ application of changes. See
+ for details about this conflict.
+
+
+
+
+
+ confl_update_origin_differsbigint
+
+
+ Number of times an update was applied to a row that had been previously
+ modified by another source during the application of changes. See
+ for details about this
+ conflict.
+
+
+
+
+
+ confl_update_existsbigint
+
+
+ Number of times that an updated row value violated a
+ NOT DEFERRABLE unique constraint during the
+ application of changes. See
+ for details about this conflict.
+
+
+
+
+
+ confl_update_missingbigint
+
+
+ Number of times the tuple to be updated was not found during the
+ application of changes. See
+ for details about this conflict.
+
+
+
+
+
+ confl_delete_origin_differsbigint
+
+
+ Number of times a delete operation was applied to row that had been
+ previously modified by another source during the application of changes.
+ See for details about
+ this conflict.
+
+
+
+
+
+ confl_delete_missingbigint
+
+
+ Number of times the tuple to be deleted was not found during the application
+ of changes. See for details
+ about this conflict.
+
+
+
stats_resettimestamp with time zone
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 19cabc9a47..7fd5d256a1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
s.subname,
ss.apply_error_count,
ss.sync_error_count,
+ ss.confl_insert_exists,
+ ss.confl_update_origin_differs,
+ ss.confl_update_exists,
+ ss.confl_update_missing,
+ ss.confl_delete_origin_differs,
+ ss.confl_delete_missing,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index a1437d4f77..5d9ff626bd 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -17,8 +17,9 @@
#include "access/commit_ts.h"
#include "access/tableam.h"
#include "executor/executor.h"
+#include "pgstat.h"
#include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
#include "storage/lmgr.h"
#include "utils/lsyscache.h"
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
Assert(!OidIsValid(indexoid) ||
CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
+
ereport(elevel,
errcode_apply_conflict(type),
errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index d9af8de658..e06c92727e 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
pending->sync_error_count++;
}
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+ PgStat_EntryRef *entry_ref;
+ PgStat_BackendSubEntry *pending;
+
+ entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+ InvalidOid, subid, NULL);
+ pending = entry_ref->pending;
+ pending->conflict_count[type]++;
+}
+
/*
* Report creating the subscription.
*/
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
SUB_ACC(apply_error_count);
SUB_ACC(sync_error_count);
+ for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+ SUB_ACC(conflict_count[i]);
#undef SUB_ACC
pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 3221137123..97dc09ac0d 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
bool nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
PgStat_StatSubEntry *subentry;
PgStat_StatSubEntry allzero;
+ int i = 0;
/* Get subscription stats */
subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
}
/* subid */
- values[0] = ObjectIdGetDatum(subid);
+ values[i++] = ObjectIdGetDatum(subid);
/* apply_error_count */
- values[1] = Int64GetDatum(subentry->apply_error_count);
+ values[i++] = Int64GetDatum(subentry->apply_error_count);
/* sync_error_count */
- values[2] = Int64GetDatum(subentry->sync_error_count);
+ values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+ /* conflict count */
+ for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+ values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
/* stats_reset */
if (subentry->stat_reset_timestamp == 0)
- nulls[3] = true;
+ nulls[i] = true;
else
- values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+ values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+ Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 1980d492c3..be6815593b 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202408301
+#define CATALOG_VERSION_NO 202409041
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 85f42be1b3..ff5436acac 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5538,9 +5538,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f63159c55c..be2c91168a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -15,6 +15,7 @@
#include "datatype/timestamp.h"
#include "portability/instr_time.h"
#include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
#include "utils/backend_progress.h" /* for backward compatibility */
#include "utils/backend_status.h" /* for backward compatibility */
#include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
{
PgStat_Counter apply_error_count;
PgStat_Counter sync_error_count;
+ PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
} PgStat_BackendSubEntry;
/* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
{
PgStat_Counter apply_error_count;
PgStat_Counter sync_error_count;
+ PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
TimestampTz stat_reset_timestamp;
} PgStat_StatSubEntry;
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
*/
extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
extern void pgstat_create_subscription(Oid subid);
extern void pgstat_drop_subscription(Oid subid);
extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index ca797fb41c..c759677ff5 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -14,6 +14,12 @@
/*
* Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count and
+ * PgStat_BackendSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
*/
typedef enum
{
@@ -42,6 +48,8 @@ typedef enum
*/
} ConflictType;
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 862433ee52..a1626f3fae 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
s.subname,
ss.apply_error_count,
ss.sync_error_count,
+ ss.confl_insert_exists,
+ ss.confl_update_origin_differs,
+ ss.confl_update_exists,
+ ss.confl_update_missing,
+ ss.confl_delete_origin_differs,
+ ss.confl_delete_missing,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index fb3e5629b3..6b6a5b0b1b 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -30,6 +30,7 @@ sub create_sub_pub_w_errors
qq[
BEGIN;
CREATE TABLE $table_name(a int);
+ ALTER TABLE $table_name REPLICA IDENTITY FULL;
INSERT INTO $table_name VALUES (1);
COMMIT;
]);
@@ -91,20 +92,36 @@ sub create_sub_pub_w_errors
# subscriber due to violation of the unique constraint on test table.
$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
- # Wait for the apply error to be reported.
+ # Wait for the subscriber to report both an apply error and an
+ # insert_exists conflict.
$node_subscriber->poll_query_until(
$db,
qq[
- SELECT apply_error_count > 0
+ SELECT apply_error_count > 0 AND confl_insert_exists > 0
FROM pg_stat_subscription_stats
WHERE subname = '$sub_name'
])
or die
- qq(Timed out while waiting for apply error for subscription '$sub_name');
+ qq(Timed out while waiting for apply error and insert_exists conflict for subscription '$sub_name');
# Truncate test table so that apply worker can continue.
$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+ # Delete data from the test table on the publisher. This delete operation
+ # should be skipped on the subscriber since the table is already empty.
+ $node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+ # Wait for the subscriber to report a delete_missing conflict.
+ $node_subscriber->poll_query_until(
+ $db,
+ qq[
+ SELECT confl_delete_missing > 0
+ FROM pg_stat_subscription_stats
+ WHERE subname = '$sub_name'
+ ])
+ or die
+ qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
return ($pub_name, $sub_name);
}
@@ -123,17 +140,19 @@ my ($pub1_name, $sub1_name) =
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
$table1_name);
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
sync_error_count > 0,
+ confl_insert_exists > 0,
+ confl_delete_missing > 0,
stats_reset IS NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t),
- qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+ qq(t|t|t|t|t),
+ qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
);
# Reset a single subscription
@@ -141,17 +160,19 @@ $node_subscriber->safe_psql($db,
qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
+ confl_insert_exists = 0,
+ confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t),
- qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+ qq(t|t|t|t|t),
+ qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
);
# Get reset timestamp
@@ -181,46 +202,52 @@ my ($pub2_name, $sub2_name) =
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
$table2_name);
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
sync_error_count > 0,
+ confl_insert_exists > 0,
+ confl_delete_missing > 0,
stats_reset IS NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
- qq(t|t|t),
- qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+ qq(t|t|t|t|t),
+ qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
);
# Reset all subscriptions
$node_subscriber->safe_psql($db,
qq(SELECT pg_stat_reset_subscription_stats(NULL)));
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
+ confl_insert_exists = 0,
+ confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t),
- qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+ qq(t|t|t|t|t),
+ qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
);
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
+ confl_insert_exists = 0,
+ confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
- qq(t|t|t),
- qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+ qq(t|t|t|t|t),
+ qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
);
$reset_time1 = $node_subscriber->safe_psql($db,