Collect statistics about conflicts in logical replication.

This commit adds columns in view pg_stat_subscription_stats to show the
number of times a particular conflict type has occurred during the
application of logical replication changes. The following columns are
added:

confl_insert_exists:
        Number of times a row insertion violated a NOT DEFERRABLE unique
        constraint.
confl_update_origin_differs:
        Number of times an update was performed on a row that was
        previously modified by another origin.
confl_update_exists:
        Number of times that the updated value of a row violates a
        NOT DEFERRABLE unique constraint.
confl_update_missing:
        Number of times that the tuple to be updated is missing.
confl_delete_origin_differs:
        Number of times a delete was performed on a row that was
        previously modified by another origin.
confl_delete_missing:
        Number of times that the tuple to be deleted is missing.

The update_origin_differs and delete_origin_differs conflicts can be
detected only when track_commit_timestamp is enabled.

Author: Hou Zhijie
Reviewed-by: Shveta Malik, Peter Smith, Anit Kapila
Discussion: https://postgr.es/m/OS0PR01MB57160A07BD575773045FC214948F2@OS0PR01MB5716.jpnprd01.prod.outlook.com
This commit is contained in:
Amit Kapila 2024-09-04 08:55:21 +05:30
parent 9626068f13
commit 6c2b5edecc
12 changed files with 204 additions and 40 deletions

View File

@ -1582,10 +1582,11 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
</para>
<para>
Additional logging is triggered in the following <firstterm>conflict</firstterm>
cases:
Additional logging is triggered, and the conflict statistics are collected (displayed in the
<link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
in the following <firstterm>conflict</firstterm> cases:
<variablelist>
<varlistentry>
<varlistentry id="conflict-insert-exists" xreflabel="insert_exists">
<term><literal>insert_exists</literal></term>
<listitem>
<para>
@ -1598,7 +1599,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
</para>
</listitem>
</varlistentry>
<varlistentry>
<varlistentry id="conflict-update-origin-differs" xreflabel="update_origin_differs">
<term><literal>update_origin_differs</literal></term>
<listitem>
<para>
@ -1610,7 +1611,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
</para>
</listitem>
</varlistentry>
<varlistentry>
<varlistentry id="conflict-update-exists" xreflabel="update_exists">
<term><literal>update_exists</literal></term>
<listitem>
<para>
@ -1627,7 +1628,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
</para>
</listitem>
</varlistentry>
<varlistentry>
<varlistentry id="conflict-update-missing" xreflabel="update_missing">
<term><literal>update_missing</literal></term>
<listitem>
<para>
@ -1636,7 +1637,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
</para>
</listitem>
</varlistentry>
<varlistentry>
<varlistentry id="conflict-delete-origin-differs" xreflabel="delete_origin_differs">
<term><literal>delete_origin_differs</literal></term>
<listitem>
<para>
@ -1648,7 +1649,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
</para>
</listitem>
</varlistentry>
<varlistentry>
<varlistentry id="conflict-delete-missing" xreflabel="delete_missing">
<term><literal>delete_missing</literal></term>
<listitem>
<para>

View File

@ -507,7 +507,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<row>
<entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
<entry>One row per subscription, showing statistics about errors.
<entry>One row per subscription, showing statistics about errors and conflicts.
See <link linkend="monitoring-pg-stat-subscription-stats">
<structname>pg_stat_subscription_stats</structname></link> for details.
</entry>
@ -2157,7 +2157,10 @@ description | Waiting for a newly initialized WAL file to reach durable storage
<structfield>apply_error_count</structfield> <type>bigint</type>
</para>
<para>
Number of times an error occurred while applying changes
Number of times an error occurred while applying changes. Note that any
conflict resulting in an apply error will be counted in both
<literal>apply_error_count</literal> and the corresponding conflict
count (e.g., <literal>confl_*</literal>).
</para></entry>
</row>
@ -2171,6 +2174,76 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>confl_insert_exists</structfield> <type>bigint</type>
</para>
<para>
Number of times a row insertion violated a
<literal>NOT DEFERRABLE</literal> unique constraint during the
application of changes. See <xref linkend="conflict-insert-exists"/>
for details about this conflict.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>confl_update_origin_differs</structfield> <type>bigint</type>
</para>
<para>
Number of times an update was applied to a row that had been previously
modified by another source during the application of changes. See
<xref linkend="conflict-update-origin-differs"/> for details about this
conflict.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>confl_update_exists</structfield> <type>bigint</type>
</para>
<para>
Number of times that an updated row value violated a
<literal>NOT DEFERRABLE</literal> unique constraint during the
application of changes. See <xref linkend="conflict-update-exists"/>
for details about this conflict.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>confl_update_missing</structfield> <type>bigint</type>
</para>
<para>
Number of times the tuple to be updated was not found during the
application of changes. See <xref linkend="conflict-update-missing"/>
for details about this conflict.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>confl_delete_origin_differs</structfield> <type>bigint</type>
</para>
<para>
Number of times a delete operation was applied to row that had been
previously modified by another source during the application of changes.
See <xref linkend="conflict-delete-origin-differs"/> for details about
this conflict.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>confl_delete_missing</structfield> <type>bigint</type>
</para>
<para>
Number of times the tuple to be deleted was not found during the application
of changes. See <xref linkend="conflict-delete-missing"/> for details
about this conflict.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>

View File

@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
s.subname,
ss.apply_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,
ss.confl_update_exists,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;

View File

@ -17,8 +17,9 @@
#include "access/commit_ts.h"
#include "access/tableam.h"
#include "executor/executor.h"
#include "pgstat.h"
#include "replication/conflict.h"
#include "replication/logicalrelation.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
#include "utils/lsyscache.h"
@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
Assert(!OidIsValid(indexoid) ||
CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
pgstat_report_subscription_conflict(MySubscription->oid, type);
ereport(elevel,
errcode_apply_conflict(type),
errmsg("conflict detected on relation \"%s.%s\": conflict=%s",

View File

@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
pending->sync_error_count++;
}
/*
* Report a subscription conflict.
*/
void
pgstat_report_subscription_conflict(Oid subid, ConflictType type)
{
PgStat_EntryRef *entry_ref;
PgStat_BackendSubEntry *pending;
entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
InvalidOid, subid, NULL);
pending = entry_ref->pending;
pending->conflict_count[type]++;
}
/*
* Report creating the subscription.
*/
@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
SUB_ACC(apply_error_count);
SUB_ACC(sync_error_count);
for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
SUB_ACC(conflict_count[i]);
#undef SUB_ACC
pgstat_unlock_entry(entry_ref);

View File

@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 4
#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
bool nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
PgStat_StatSubEntry *subentry;
PgStat_StatSubEntry allzero;
int i = 0;
/* Get subscription stats */
subentry = pgstat_fetch_stat_subscription(subid);
@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
}
/* subid */
values[0] = ObjectIdGetDatum(subid);
values[i++] = ObjectIdGetDatum(subid);
/* apply_error_count */
values[1] = Int64GetDatum(subentry->apply_error_count);
values[i++] = Int64GetDatum(subentry->apply_error_count);
/* sync_error_count */
values[2] = Int64GetDatum(subentry->sync_error_count);
values[i++] = Int64GetDatum(subentry->sync_error_count);
/* conflict count */
for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
/* stats_reset */
if (subentry->stat_reset_timestamp == 0)
nulls[3] = true;
nulls[i] = true;
else
values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));

View File

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202408301
#define CATALOG_VERSION_NO 202409041
#endif

View File

@ -5538,9 +5538,9 @@
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
proallargtypes => '{oid,oid,int8,int8,timestamptz}',
proargmodes => '{i,o,o,o,o}',
proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',

View File

@ -15,6 +15,7 @@
#include "datatype/timestamp.h"
#include "portability/instr_time.h"
#include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
#include "replication/conflict.h"
#include "utils/backend_progress.h" /* for backward compatibility */
#include "utils/backend_status.h" /* for backward compatibility */
#include "utils/relcache.h"
@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
{
PgStat_Counter apply_error_count;
PgStat_Counter sync_error_count;
PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
} PgStat_BackendSubEntry;
/* ----------
@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
{
PgStat_Counter apply_error_count;
PgStat_Counter sync_error_count;
PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
TimestampTz stat_reset_timestamp;
} PgStat_StatSubEntry;
@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
*/
extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
extern void pgstat_create_subscription(Oid subid);
extern void pgstat_drop_subscription(Oid subid);
extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);

View File

@ -14,6 +14,12 @@
/*
* Conflict types that could occur while applying remote changes.
*
* This enum is used in statistics collection (see
* PgStat_StatSubEntry::conflict_count and
* PgStat_BackendSubEntry::conflict_count) as well, therefore, when adding new
* values or reordering existing ones, ensure to review and potentially adjust
* the corresponding statistics collection codes.
*/
typedef enum
{
@ -42,6 +48,8 @@ typedef enum
*/
} ConflictType;
#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,

View File

@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
s.subname,
ss.apply_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,
ss.confl_update_exists,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
ss.stats_reset
FROM pg_subscription s,
LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,

View File

@ -30,6 +30,7 @@ sub create_sub_pub_w_errors
qq[
BEGIN;
CREATE TABLE $table_name(a int);
ALTER TABLE $table_name REPLICA IDENTITY FULL;
INSERT INTO $table_name VALUES (1);
COMMIT;
]);
@ -91,20 +92,36 @@ sub create_sub_pub_w_errors
# subscriber due to violation of the unique constraint on test table.
$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
# Wait for the apply error to be reported.
# Wait for the subscriber to report both an apply error and an
# insert_exists conflict.
$node_subscriber->poll_query_until(
$db,
qq[
SELECT apply_error_count > 0
SELECT apply_error_count > 0 AND confl_insert_exists > 0
FROM pg_stat_subscription_stats
WHERE subname = '$sub_name'
])
or die
qq(Timed out while waiting for apply error for subscription '$sub_name');
qq(Timed out while waiting for apply error and insert_exists conflict for subscription '$sub_name');
# Truncate test table so that apply worker can continue.
$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
# Delete data from the test table on the publisher. This delete operation
# should be skipped on the subscriber since the table is already empty.
$node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
# Wait for the subscriber to report a delete_missing conflict.
$node_subscriber->poll_query_until(
$db,
qq[
SELECT confl_delete_missing > 0
FROM pg_stat_subscription_stats
WHERE subname = '$sub_name'
])
or die
qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
return ($pub_name, $sub_name);
}
@ -123,17 +140,19 @@ my ($pub1_name, $sub1_name) =
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
$table1_name);
# Apply and Sync errors are > 0 and reset timestamp is NULL
# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
sync_error_count > 0,
confl_insert_exists > 0,
confl_delete_missing > 0,
stats_reset IS NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
qq(t|t|t),
qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
qq(t|t|t|t|t),
qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
);
# Reset a single subscription
@ -141,17 +160,19 @@ $node_subscriber->safe_psql($db,
qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);
# Apply and Sync errors are 0 and stats reset is not NULL
# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
qq(t|t|t),
qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
qq(t|t|t|t|t),
qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
);
# Get reset timestamp
@ -181,46 +202,52 @@ my ($pub2_name, $sub2_name) =
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
$table2_name);
# Apply and Sync errors are > 0 and reset timestamp is NULL
# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
sync_error_count > 0,
confl_insert_exists > 0,
confl_delete_missing > 0,
stats_reset IS NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
qq(t|t|t),
qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
qq(t|t|t|t|t),
qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
);
# Reset all subscriptions
$node_subscriber->safe_psql($db,
qq(SELECT pg_stat_reset_subscription_stats(NULL)));
# Apply and Sync errors are 0 and stats reset is not NULL
# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
qq(t|t|t),
qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
qq(t|t|t|t|t),
qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
);
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
qq(t|t|t),
qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
qq(t|t|t|t|t),
qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
);
$reset_time1 = $node_subscriber->safe_psql($db,