diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index af6914872b..62f2a3332b 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -627,6 +627,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
+
+ pg_stat_subscription_workerspg_stat_subscription_workers
+ One row per subscription worker, showing statistics about errors
+ that occurred on that subscription worker.
+ See
+ pg_stat_subscription_workers for details.
+
+
+
@@ -3054,6 +3063,128 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
+
+ pg_stat_subscription_workers
+
+
+ pg_stat_subscription_workers
+
+
+
+ The pg_stat_subscription_workers view will contain
+ one row per subscription worker on which errors have occurred, for workers
+ applying logical replication changes and workers handling the initial data
+ copy of the subscribed tables. The statistics entry is removed when the
+ corresponding subscription is dropped.
+
+
+
+ pg_stat_subscription_workers View
+
+
+
+
+ Column Type
+
+
+ Description
+
+
+
+
+
+
+
+ subidoid
+
+
+ OID of the subscription
+
+
+
+
+
+ subnamename
+
+
+ Name of the subscription
+
+
+
+
+
+ subrelidoid
+
+
+ OID of the relation that the worker is synchronizing; null for the
+ main apply worker
+
+
+
+
+
+ last_error_relidoid
+
+
+ OID of the relation that the worker was processing when the
+ error occurred
+
+
+
+
+
+ last_error_commandtext
+
+
+ Name of command being applied when the error occurred. This field
+ is null if the error was reported during the initial data copy.
+
+
+
+
+
+ last_error_xidxid
+
+
+ Transaction ID of the publisher node being applied when the error
+ occurred. This field is null if the error was reported
+ during the initial data copy.
+
+
+
+
+
+ last_error_countuint8
+
+
+ Number of consecutive times the error occurred
+
+
+
+
+
+ last_error_messagetext
+
+
+ The error message
+
+
+
+
+
+ last_error_timetimestamp with time zone
+
+
+ Last time at which this error occurred
+
+
+
+
+
+
+
+
+
pg_stat_ssl
@@ -5176,6 +5307,32 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
can be granted EXECUTE to run the function.
+
+
+
+
+ pg_stat_reset_subscription_worker
+
+ pg_stat_reset_subscription_worker ( subidoid, relidoid )
+ void
+
+
+ Resets the statistics of subscription workers running on the
+ subscription with subid shown in the
+ pg_stat_subscription_workers view. If the
+ argument relid is not NULL,
+ resets statistics of the subscription worker handling the initial data
+ copy of the relation with relid. Otherwise,
+ resets the subscription worker statistics of the main apply worker.
+ If the argument relid is omitted, resets the
+ statistics of all subscription workers running on the subscription
+ with subid.
+
+
+ This function is restricted to superusers by default, but other users
+ can be granted EXECUTE to run the function.
+
+
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index f6789025a5..3a4fa9091b 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -639,6 +639,10 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ
REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid) FROM public;
+
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid, oid) FROM public;
+
REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index eb560955cd..61b515cdb8 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,3 +1261,26 @@ REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
substream, subtwophasestate, subslotname, subsynccommit, subpublications)
ON pg_subscription TO public;
+
+CREATE VIEW pg_stat_subscription_workers AS
+ SELECT
+ w.subid,
+ s.subname,
+ w.subrelid,
+ w.last_error_relid,
+ w.last_error_command,
+ w.last_error_xid,
+ w.last_error_count,
+ w.last_error_message,
+ w.last_error_time
+ FROM (SELECT
+ oid as subid,
+ NULL as relid
+ FROM pg_subscription
+ UNION ALL
+ SELECT
+ srsubid as subid,
+ srrelid as relid
+ FROM pg_subscription_rel) sr,
+ LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w
+ JOIN pg_subscription s ON (w.subid = s.oid);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c47ba26369..9427e86fee 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -32,6 +32,7 @@
#include "executor/executor.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
+#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/slot.h"
@@ -1204,7 +1205,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* Since dropping a replication slot is not transactional, the replication
* slot stays dropped even if the transaction rolls back. So we cannot
* run DROP SUBSCRIPTION inside a transaction block if dropping the
- * replication slot.
+ * replication slot. Also, in this case, we report a message for dropping
+ * the subscription to the stats collector.
*
* XXX The command name should really be something like "DROP SUBSCRIPTION
* of a subscription that is associated with a replication slot", but we
@@ -1377,6 +1379,18 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
PG_END_TRY();
+ /*
+ * Send a message for dropping this subscription to the stats collector.
+ * We can safely report dropping the subscription statistics here if the
+ * subscription is associated with a replication slot since we cannot run
+ * DROP SUBSCRIPTION inside a transaction block. Subscription statistics
+ * will be removed later by (auto)vacuum either if it's not associated
+ * with a replication slot or if the message for dropping the subscription
+ * gets lost.
+ */
+ if (slotname)
+ pgstat_report_subscription_drop(subid);
+
table_close(rel, NoLock);
}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 8c166e5e16..7264d2c727 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -41,6 +41,7 @@
#include "catalog/catalog.h"
#include "catalog/pg_database.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_subscription.h"
#include "common/ip.h"
#include "executor/instrument.h"
#include "libpq/libpq.h"
@@ -105,6 +106,7 @@
#define PGSTAT_DB_HASH_SIZE 16
#define PGSTAT_TAB_HASH_SIZE 512
#define PGSTAT_FUNCTION_HASH_SIZE 512
+#define PGSTAT_SUBWORKER_HASH_SIZE 32
#define PGSTAT_REPLSLOT_HASH_SIZE 32
@@ -320,10 +322,14 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
Oid tableoid, bool create);
+static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry,
+ Oid subid, Oid subrelid,
+ bool create);
static void pgstat_write_statsfiles(bool permanent, bool allDbs);
static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
-static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
+static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
+ HTAB *subworkerhash, bool permanent);
static void backend_read_statsfile(void);
static bool pgstat_write_statsfile_needed(void);
@@ -335,6 +341,7 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
static void pgstat_send_funcstats(void);
static void pgstat_send_slru(void);
+static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg);
static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid);
static bool pgstat_should_report_connstat(void);
static void pgstat_report_disconnect(Oid dboid);
@@ -373,6 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
+static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
/* ------------------------------------------------------------
* Public functions called from postmaster follow
@@ -1302,6 +1311,74 @@ pgstat_vacuum_stat(void)
hash_destroy(htab);
}
+
+ /*
+ * Repeat for subscription workers. Similarly, we needn't bother in the
+ * common case where no subscription workers' stats are being collected.
+ */
+ if (dbentry->subworkers != NULL &&
+ hash_get_num_entries(dbentry->subworkers) > 0)
+ {
+ PgStat_StatSubWorkerEntry *subwentry;
+ PgStat_MsgSubscriptionPurge spmsg;
+
+ /*
+ * Read pg_subscription and make a list of OIDs of all existing
+ * subscriptions
+ */
+ htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
+
+ spmsg.m_databaseid = MyDatabaseId;
+ spmsg.m_nentries = 0;
+
+ hash_seq_init(&hstat, dbentry->subworkers);
+ while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ bool exists = false;
+ Oid subid = subwentry->key.subid;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (hash_search(htab, (void *) &subid, HASH_FIND, NULL) != NULL)
+ continue;
+
+ /*
+ * It is possible that we have multiple entries for the
+ * subscription corresponding to apply worker and tablesync
+ * workers. In such cases, we don't need to add the same subid
+ * again.
+ */
+ for (int i = 0; i < spmsg.m_nentries; i++)
+ {
+ if (spmsg.m_subids[i] == subid)
+ {
+ exists = true;
+ break;
+ }
+ }
+
+ if (exists)
+ continue;
+
+ /* This subscription is dead, add the subid to the message */
+ spmsg.m_subids[spmsg.m_nentries++] = subid;
+
+ /*
+ * If the message is full, send it out and reinitialize to empty
+ */
+ if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE)
+ {
+ pgstat_send_subscription_purge(&spmsg);
+ spmsg.m_nentries = 0;
+ }
+ }
+
+ /* Send the rest of dead subscriptions */
+ if (spmsg.m_nentries > 0)
+ pgstat_send_subscription_purge(&spmsg);
+
+ hash_destroy(htab);
+ }
}
@@ -1474,7 +1551,8 @@ pgstat_reset_shared_counters(const char *target)
* ----------
*/
void
-pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
+pgstat_reset_single_counter(Oid objoid, Oid subobjoid,
+ PgStat_Single_Reset_Type type)
{
PgStat_MsgResetsinglecounter msg;
@@ -1485,6 +1563,7 @@ pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
msg.m_databaseid = MyDatabaseId;
msg.m_resettype = type;
msg.m_objectid = objoid;
+ msg.m_subobjectid = subobjoid;
pgstat_send(&msg, sizeof(msg));
}
@@ -1869,6 +1948,51 @@ pgstat_report_replslot_drop(const char *slotname)
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
+/* ----------
+ * pgstat_report_subworker_error() -
+ *
+ * Tell the collector about the subscription worker error.
+ * ----------
+ */
+void
+pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
+ LogicalRepMsgType command, TransactionId xid,
+ const char *errmsg)
+{
+ PgStat_MsgSubWorkerError msg;
+ int len;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR);
+ msg.m_databaseid = MyDatabaseId;
+ msg.m_subid = subid;
+ msg.m_subrelid = subrelid;
+ msg.m_relid = relid;
+ msg.m_command = command;
+ msg.m_xid = xid;
+ msg.m_timestamp = GetCurrentTimestamp();
+ strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
+
+ len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1;
+ pgstat_send(&msg, len);
+}
+
+/* ----------
+ * pgstat_report_subscription_drop() -
+ *
+ * Tell the collector about dropping the subscription.
+ * ----------
+ */
+void
+pgstat_report_subscription_drop(Oid subid)
+{
+ PgStat_MsgSubscriptionPurge msg;
+
+ msg.m_databaseid = MyDatabaseId;
+ msg.m_subids[0] = subid;
+ msg.m_nentries = 1;
+ pgstat_send_subscription_purge(&msg);
+}
+
/* ----------
* pgstat_ping() -
*
@@ -2874,6 +2998,35 @@ pgstat_fetch_stat_funcentry(Oid func_id)
return funcentry;
}
+/*
+ * ---------
+ * pgstat_fetch_stat_subworker_entry() -
+ *
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * the collected statistics for subscription worker or NULL.
+ * ---------
+ */
+PgStat_StatSubWorkerEntry *
+pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid)
+{
+ PgStat_StatDBEntry *dbentry;
+ PgStat_StatSubWorkerEntry *wentry = NULL;
+
+ /* Load the stats file if needed */
+ backend_read_statsfile();
+
+ /*
+ * Lookup our database, then find the requested subscription worker stats.
+ */
+ dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
+ if (dbentry != NULL && dbentry->subworkers != NULL)
+ {
+ wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid,
+ false);
+ }
+
+ return wentry;
+}
/*
* ---------
@@ -3312,6 +3465,23 @@ pgstat_send_slru(void)
}
}
+/* --------
+ * pgstat_send_subscription_purge() -
+ *
+ * Send a subscription purge message to the collector
+ * --------
+ */
+static void
+pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg)
+{
+ int len;
+
+ len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0])
+ + msg->m_nentries * sizeof(Oid);
+
+ pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE);
+ pgstat_send(msg, len);
+}
/* ----------
* PgstatCollectorMain() -
@@ -3568,6 +3738,14 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_recv_disconnect(&msg.msg_disconnect, len);
break;
+ case PGSTAT_MTYPE_SUBSCRIPTIONPURGE:
+ pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len);
+ break;
+
+ case PGSTAT_MTYPE_SUBWORKERERROR:
+ pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
+ break;
+
default:
break;
}
@@ -3613,7 +3791,8 @@ PgstatCollectorMain(int argc, char *argv[])
/*
* Subroutine to clear stats in a database entry
*
- * Tables and functions hashes are initialized to empty.
+ * Tables, functions, and subscription workers hashes are initialized
+ * to empty.
*/
static void
reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
@@ -3666,6 +3845,13 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
PGSTAT_FUNCTION_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_BLOBS);
+
+ hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
+ hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+ dbentry->subworkers = hash_create("Per-database subscription worker",
+ PGSTAT_SUBWORKER_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS);
}
/*
@@ -3690,7 +3876,7 @@ pgstat_get_db_entry(Oid databaseid, bool create)
/*
* If not found, initialize the new one. This creates empty hash tables
- * for tables and functions, too.
+ * for tables, functions, and subscription workers, too.
*/
if (!found)
reset_dbentry_counters(result);
@@ -3748,6 +3934,47 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
return result;
}
+/* ----------
+ * pgstat_get_subworker_entry
+ *
+ * Return subscription worker entry with the given subscription OID and
+ * relation OID. If subrelid is InvalidOid, it returns an entry of the
+ * apply worker otherwise returns an entry of the table sync worker
+ * associated with subrelid. If no subscription worker entry exists,
+ * initialize it, if the create parameter is true. Else, return NULL.
+ * ----------
+ */
+static PgStat_StatSubWorkerEntry *
+pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
+ bool create)
+{
+ PgStat_StatSubWorkerEntry *subwentry;
+ PgStat_StatSubWorkerKey key;
+ bool found;
+ HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
+
+ key.subid = subid;
+ key.subrelid = subrelid;
+ subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers,
+ (void *) &key,
+ action, &found);
+
+ if (!create && !found)
+ return NULL;
+
+ /* If not found, initialize the new one */
+ if (!found)
+ {
+ subwentry->last_error_relid = InvalidOid;
+ subwentry->last_error_command = 0;
+ subwentry->last_error_xid = InvalidTransactionId;
+ subwentry->last_error_count = 0;
+ subwentry->last_error_time = 0;
+ subwentry->last_error_message[0] = '\0';
+ }
+
+ return subwentry;
+}
/* ----------
* pgstat_write_statsfiles() -
@@ -3832,8 +4059,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
{
/*
- * Write out the table and function stats for this DB into the
- * appropriate per-DB stat file, if required.
+ * Write out the table, function, and subscription-worker stats for
+ * this DB into the appropriate per-DB stat file, if required.
*/
if (allDbs || pgstat_db_requested(dbentry->databaseid))
{
@@ -3947,8 +4174,10 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
{
HASH_SEQ_STATUS tstat;
HASH_SEQ_STATUS fstat;
+ HASH_SEQ_STATUS sstat;
PgStat_StatTabEntry *tabentry;
PgStat_StatFuncEntry *funcentry;
+ PgStat_StatSubWorkerEntry *subwentry;
FILE *fpout;
int32 format_id;
Oid dbid = dbentry->databaseid;
@@ -4003,6 +4232,17 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
(void) rc; /* we'll check for error with ferror */
}
+ /*
+ * Walk through the database's subscription worker stats table.
+ */
+ hash_seq_init(&sstat, dbentry->subworkers);
+ while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL)
+ {
+ fputc('S', fpout);
+ rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+
/*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
@@ -4061,8 +4301,9 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
* files after reading; the in-memory status is now authoritative, and the
* files would be out of date in case somebody else reads them.
*
- * If a 'deep' read is requested, table/function stats are read, otherwise
- * the table/function hash tables remain empty.
+ * If a 'deep' read is requested, table/function/subscription-worker stats are
+ * read, otherwise the table/function/subscription-worker hash tables remain
+ * empty.
* ----------
*/
static HTAB *
@@ -4241,6 +4482,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
dbentry->tables = NULL;
dbentry->functions = NULL;
+ dbentry->subworkers = NULL;
/*
* In the collector, disregard the timestamp we read from the
@@ -4252,8 +4494,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbentry->stats_timestamp = 0;
/*
- * Don't create tables/functions hashtables for uninteresting
- * databases.
+ * Don't create tables/functions/subworkers hashtables for
+ * uninteresting databases.
*/
if (onlydb != InvalidOid)
{
@@ -4278,6 +4520,14 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
&hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
+ hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+ hash_ctl.hcxt = pgStatLocalContext;
+ dbentry->subworkers = hash_create("Per-database subscription worker",
+ PGSTAT_SUBWORKER_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
/*
* If requested, read the data from the database-specific
* file. Otherwise we just leave the hashtables empty.
@@ -4286,6 +4536,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
pgstat_read_db_statsfile(dbentry->databaseid,
dbentry->tables,
dbentry->functions,
+ dbentry->subworkers,
permanent);
break;
@@ -4363,19 +4614,21 @@ done:
* As in pgstat_read_statsfiles, if the permanent file is requested, it is
* removed after reading.
*
- * Note: this code has the ability to skip storing per-table or per-function
- * data, if NULL is passed for the corresponding hashtable. That's not used
- * at the moment though.
+ * Note: this code has the ability to skip storing per-table, per-function, or
+ * per-subscription-worker data, if NULL is passed for the corresponding hashtable.
+ * That's not used at the moment though.
* ----------
*/
static void
pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
- bool permanent)
+ HTAB *subworkerhash, bool permanent)
{
PgStat_StatTabEntry *tabentry;
PgStat_StatTabEntry tabbuf;
PgStat_StatFuncEntry funcbuf;
PgStat_StatFuncEntry *funcentry;
+ PgStat_StatSubWorkerEntry subwbuf;
+ PgStat_StatSubWorkerEntry *subwentry;
FILE *fpin;
int32 format_id;
bool found;
@@ -4489,6 +4742,41 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
memcpy(funcentry, &funcbuf, sizeof(funcbuf));
break;
+ /*
+ * 'S' A PgStat_StatSubWorkerEntry struct describing
+ * subscription worker statistics.
+ */
+ case 'S':
+ if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry),
+ fpin) != sizeof(PgStat_StatSubWorkerEntry))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+
+ /*
+ * Skip if subscription worker data not wanted.
+ */
+ if (subworkerhash == NULL)
+ break;
+
+ subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash,
+ (void *) &subwbuf.key,
+ HASH_ENTER, &found);
+
+ if (found)
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+
+ memcpy(subwentry, &subwbuf, sizeof(subwbuf));
+ break;
+
/*
* 'E' The EOF marker of a complete stats file.
*/
@@ -5162,6 +5450,8 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
hash_destroy(dbentry->tables);
if (dbentry->functions != NULL)
hash_destroy(dbentry->functions);
+ if (dbentry->subworkers != NULL)
+ hash_destroy(dbentry->subworkers);
if (hash_search(pgStatDBHash,
(void *) &dbid,
@@ -5199,13 +5489,16 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
hash_destroy(dbentry->tables);
if (dbentry->functions != NULL)
hash_destroy(dbentry->functions);
+ if (dbentry->subworkers != NULL)
+ hash_destroy(dbentry->subworkers);
dbentry->tables = NULL;
dbentry->functions = NULL;
+ dbentry->subworkers = NULL;
/*
* Reset database-level stats, too. This creates empty hash tables for
- * tables and functions.
+ * tables, functions, and subscription workers.
*/
reset_dbentry_counters(dbentry);
}
@@ -5274,6 +5567,14 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
else if (msg->m_resettype == RESET_FUNCTION)
(void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
HASH_REMOVE, NULL);
+ else if (msg->m_resettype == RESET_SUBWORKER)
+ {
+ PgStat_StatSubWorkerKey key;
+
+ key.subid = msg->m_objectid;
+ key.subrelid = msg->m_subobjectid;
+ (void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL);
+ }
}
/* ----------
@@ -5816,6 +6117,84 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
}
}
+/* ----------
+ * pgstat_recv_subscription_purge() -
+ *
+ * Process a SUBSCRIPTIONPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
+{
+ HASH_SEQ_STATUS hstat;
+ PgStat_StatDBEntry *dbentry;
+ PgStat_StatSubWorkerEntry *subwentry;
+
+ dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
+
+ /* No need to purge if we don't even know the database */
+ if (!dbentry || !dbentry->subworkers)
+ return;
+
+ /* Remove all subscription worker statistics for the given subscriptions */
+ hash_seq_init(&hstat, dbentry->subworkers);
+ while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ for (int i = 0; i < msg->m_nentries; i++)
+ {
+ if (subwentry->key.subid == msg->m_subids[i])
+ {
+ (void) hash_search(dbentry->subworkers, (void *) &(subwentry->key),
+ HASH_REMOVE, NULL);
+ break;
+ }
+ }
+ }
+}
+
+/* ----------
+ * pgstat_recv_subworker_error() -
+ *
+ * Process a SUBWORKERERROR message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
+{
+ PgStat_StatDBEntry *dbentry;
+ PgStat_StatSubWorkerEntry *subwentry;
+
+ dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+
+ /* Get the subscription worker stats */
+ subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
+ msg->m_subrelid, true);
+ Assert(subwentry);
+
+ if (subwentry->last_error_relid == msg->m_relid &&
+ subwentry->last_error_command == msg->m_command &&
+ subwentry->last_error_xid == msg->m_xid &&
+ strcmp(subwentry->last_error_message, msg->m_message) == 0)
+ {
+ /*
+ * The same error occurred again in succession, just update its
+ * timestamp and count.
+ */
+ subwentry->last_error_count++;
+ subwentry->last_error_time = msg->m_timestamp;
+ return;
+ }
+
+ /* Otherwise, update the error information */
+ subwentry->last_error_relid = msg->m_relid;
+ subwentry->last_error_command = msg->m_command;
+ subwentry->last_error_xid = msg->m_xid;
+ subwentry->last_error_count = 1;
+ subwentry->last_error_time = msg->m_timestamp;
+ strlcpy(subwentry->last_error_message, msg->m_message,
+ PGSTAT_SUBWORKERERROR_MSGLEN);
+}
+
/* ----------
* pgstat_write_statsfile_needed() -
*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ae1b391bda..2e79302a48 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3332,6 +3332,7 @@ void
ApplyWorkerMain(Datum main_arg)
{
int worker_slot = DatumGetInt32(main_arg);
+ MemoryContext cctx = CurrentMemoryContext;
MemoryContext oldctx;
char originname[NAMEDATALEN];
XLogRecPtr origin_startpos;
@@ -3432,8 +3433,30 @@ ApplyWorkerMain(Datum main_arg)
{
char *syncslotname;
- /* This is table synchronization worker, call initial sync. */
- syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+ PG_TRY();
+ {
+ /* This is table synchronization worker, call initial sync. */
+ syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+ }
+ PG_CATCH();
+ {
+ MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+ ErrorData *errdata = CopyErrorData();
+
+ /*
+ * Report the table sync error. There is no corresponding message
+ * type for table synchronization.
+ */
+ pgstat_report_subworker_error(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relid,
+ 0, /* message type */
+ InvalidTransactionId,
+ errdata->message);
+ MemoryContextSwitchTo(ecxt);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
/* allocate slot name in long-lived context */
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
@@ -3551,7 +3574,32 @@ ApplyWorkerMain(Datum main_arg)
}
/* Run the main loop. */
- LogicalRepApplyLoop(origin_startpos);
+ PG_TRY();
+ {
+ LogicalRepApplyLoop(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ /* report the apply error */
+ if (apply_error_callback_arg.command != 0)
+ {
+ MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+ ErrorData *errdata = CopyErrorData();
+
+ pgstat_report_subworker_error(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ apply_error_callback_arg.rel != NULL
+ ? apply_error_callback_arg.rel->localreloid
+ : InvalidOid,
+ apply_error_callback_arg.command,
+ apply_error_callback_arg.remote_xid,
+ errdata->message);
+ MemoryContextSwitchTo(ecxt);
+ }
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
proc_exit(0);
}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index e64857e540..f529c1561a 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2172,7 +2172,7 @@ pg_stat_reset_single_table_counters(PG_FUNCTION_ARGS)
{
Oid taboid = PG_GETARG_OID(0);
- pgstat_reset_single_counter(taboid, RESET_TABLE);
+ pgstat_reset_single_counter(taboid, InvalidOid, RESET_TABLE);
PG_RETURN_VOID();
}
@@ -2182,11 +2182,38 @@ pg_stat_reset_single_function_counters(PG_FUNCTION_ARGS)
{
Oid funcoid = PG_GETARG_OID(0);
- pgstat_reset_single_counter(funcoid, RESET_FUNCTION);
+ pgstat_reset_single_counter(funcoid, InvalidOid, RESET_FUNCTION);
PG_RETURN_VOID();
}
+Datum
+pg_stat_reset_subscription_worker_subrel(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+ Oid relid = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+
+ pgstat_reset_single_counter(subid, relid, RESET_SUBWORKER);
+
+ PG_RETURN_VOID();
+}
+
+/* Reset all subscription worker stats associated with the given subscription */
+Datum
+pg_stat_reset_subscription_worker_sub(PG_FUNCTION_ARGS)
+{
+ Oid subid = PG_GETARG_OID(0);
+
+ /*
+ * Use subscription drop message to remove statistics of all subscription
+ * workers.
+ */
+ pgstat_report_subscription_drop(subid);
+
+ PG_RETURN_VOID();
+}
+
+
/* Reset SLRU counters (a specific one or all of them). */
Datum
pg_stat_reset_slru(PG_FUNCTION_ARGS)
@@ -2380,3 +2407,100 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
+
+/*
+ * Get the subscription worker statistics for the given subscription
+ * (and relation).
+ */
+Datum
+pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 8
+ Oid subid = PG_GETARG_OID(0);
+ Oid subrelid;
+ TupleDesc tupdesc;
+ Datum values[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS];
+ bool nulls[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS];
+ PgStat_StatSubWorkerEntry *wentry;
+ int i;
+
+ if (PG_ARGISNULL(1))
+ subrelid = InvalidOid;
+ else
+ subrelid = PG_GETARG_OID(1);
+
+ /* Get subscription worker stats */
+ wentry = pgstat_fetch_stat_subworker_entry(subid, subrelid);
+
+ /* Return NULL if there is no worker statistics */
+ if (wentry == NULL)
+ PG_RETURN_NULL();
+
+ /* Initialise attributes information in the tuple descriptor */
+ tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_WORKER_COLS);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid",
+ OIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
+ XIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+ TIMESTAMPTZOID, -1, 0);
+ BlessTupleDesc(tupdesc);
+
+ /* Initialise values and NULL flags arrays */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ i = 0;
+ /* subid */
+ values[i++] = ObjectIdGetDatum(subid);
+
+ /* subrelid */
+ if (OidIsValid(subrelid))
+ values[i++] = ObjectIdGetDatum(subrelid);
+ else
+ nulls[i++] = true;
+
+ /* last_error_relid */
+ if (OidIsValid(wentry->last_error_relid))
+ values[i++] = ObjectIdGetDatum(wentry->last_error_relid);
+ else
+ nulls[i++] = true;
+
+ /* last_error_command */
+ if (wentry->last_error_command != 0)
+ values[i++] =
+ CStringGetTextDatum(logicalrep_message_type(wentry->last_error_command));
+ else
+ nulls[i++] = true;
+
+ /* last_error_xid */
+ if (TransactionIdIsValid(wentry->last_error_xid))
+ values[i++] = TransactionIdGetDatum(wentry->last_error_xid);
+ else
+ nulls[i++] = true;
+
+ /* last_error_count */
+ values[i++] = Int64GetDatum(wentry->last_error_count);
+
+ /* last_error_message */
+ values[i++] = CStringGetTextDatum(wentry->last_error_message);
+
+ /* last_error_time */
+ if (wentry->last_error_time != 0)
+ values[i++] = TimestampTzGetDatum(wentry->last_error_time);
+ else
+ nulls[i++] = true;
+
+ /* Returns the record as Datum */
+ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 920390b8b2..d0fa1d1222 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202111231
+#define CATALOG_VERSION_NO 202111301
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index e934361dc3..79d787cd26 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5389,6 +5389,14 @@
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slot' },
+{ oid => '8523', descr => 'statistics: information about subscription worker',
+ proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
+ proretset => 't', provolatile => 's', proparallel => 'r',
+ prorettype => 'record', proargtypes => 'oid oid',
+ proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
+ proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
+ prosrc => 'pg_stat_get_subscription_worker' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5776,6 +5784,16 @@
proname => 'pg_stat_reset_replication_slot', proisstrict => 'f',
provolatile => 'v', prorettype => 'void', proargtypes => 'text',
prosrc => 'pg_stat_reset_replication_slot' },
+{ oid => '8524',
+ descr => 'statistics: reset collected statistics for a single subscription worker',
+ proname => 'pg_stat_reset_subscription_worker', proisstrict => 'f',
+ provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid',
+ prosrc => 'pg_stat_reset_subscription_worker_subrel' },
+{ oid => '8525',
+ descr => 'statistics: reset all collected statistics for a single subscription',
+ proname => 'pg_stat_reset_subscription_worker',
+ provolatile => 'v', prorettype => 'void', proargtypes => 'oid',
+ prosrc => 'pg_stat_reset_subscription_worker_sub' },
{ oid => '3163', descr => 'current trigger depth',
proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index bcd3588ea2..5b51b58e5a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -14,6 +14,7 @@
#include "datatype/timestamp.h"
#include "portability/instr_time.h"
#include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
+#include "replication/logicalproto.h"
#include "utils/backend_progress.h" /* for backward compatibility */
#include "utils/backend_status.h" /* for backward compatibility */
#include "utils/hsearch.h"
@@ -83,6 +84,8 @@ typedef enum StatMsgType
PGSTAT_MTYPE_REPLSLOT,
PGSTAT_MTYPE_CONNECT,
PGSTAT_MTYPE_DISCONNECT,
+ PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
+ PGSTAT_MTYPE_SUBWORKERERROR,
} StatMsgType;
/* ----------
@@ -145,7 +148,8 @@ typedef enum PgStat_Shared_Reset_Target
typedef enum PgStat_Single_Reset_Type
{
RESET_TABLE,
- RESET_FUNCTION
+ RESET_FUNCTION,
+ RESET_SUBWORKER
} PgStat_Single_Reset_Type;
/* ------------------------------------------------------------
@@ -364,6 +368,7 @@ typedef struct PgStat_MsgResetsinglecounter
Oid m_databaseid;
PgStat_Single_Reset_Type m_resettype;
Oid m_objectid;
+ Oid m_subobjectid;
} PgStat_MsgResetsinglecounter;
/* ----------
@@ -536,6 +541,54 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_total_bytes;
} PgStat_MsgReplSlot;
+/* ----------
+ * PgStat_MsgSubscriptionPurge Sent by the backend and autovacuum to tell the
+ * collector about the dead subscriptions.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBSCRIPTIONPURGE \
+ ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubscriptionPurge
+{
+ PgStat_MsgHdr m_hdr;
+ Oid m_databaseid;
+ int m_nentries;
+ Oid m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE];
+} PgStat_MsgSubscriptionPurge;
+
+/* ----------
+ * PgStat_MsgSubWorkerError Sent by the apply worker or the table sync
+ * worker to report the error occurred while
+ * processing changes.
+ * ----------
+ */
+#define PGSTAT_SUBWORKERERROR_MSGLEN 256
+typedef struct PgStat_MsgSubWorkerError
+{
+ PgStat_MsgHdr m_hdr;
+
+ /*
+ * m_subid and m_subrelid are used to determine the subscription and the
+ * reporter of the error. m_subrelid is InvalidOid if reported by an apply
+ * worker otherwise reported by a table sync worker.
+ */
+ Oid m_databaseid;
+ Oid m_subid;
+ Oid m_subrelid;
+
+ /*
+ * Oid of the table that the reporter was actually processing. m_relid can
+ * be InvalidOid if an error occurred during worker applying a
+ * non-data-modification message such as RELATION.
+ */
+ Oid m_relid;
+
+ LogicalRepMsgType m_command;
+ TransactionId m_xid;
+ TimestampTz m_timestamp;
+ char m_message[PGSTAT_SUBWORKERERROR_MSGLEN];
+} PgStat_MsgSubWorkerError;
/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
@@ -714,6 +767,8 @@ typedef union PgStat_Msg
PgStat_MsgReplSlot msg_replslot;
PgStat_MsgConnect msg_connect;
PgStat_MsgDisconnect msg_disconnect;
+ PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
+ PgStat_MsgSubWorkerError msg_subworkererror;
} PgStat_Msg;
@@ -725,7 +780,7 @@ typedef union PgStat_Msg
* ------------------------------------------------------------
*/
-#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA4
+#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA5
/* ----------
* PgStat_StatDBEntry The collector's data per database
@@ -768,11 +823,16 @@ typedef struct PgStat_StatDBEntry
TimestampTz stats_timestamp; /* time of db stats file update */
/*
- * tables and functions must be last in the struct, because we don't write
- * the pointers out to the stats file.
+ * tables, functions, and subscription workers must be last in the struct,
+ * because we don't write the pointers out to the stats file.
+ *
+ * subworkers is the hash table of PgStat_StatSubWorkerEntry which stores
+ * statistics of logical replication workers: apply worker and table sync
+ * worker.
*/
HTAB *tables;
HTAB *functions;
+ HTAB *subworkers;
} PgStat_StatDBEntry;
@@ -929,6 +989,38 @@ typedef struct PgStat_StatReplSlotEntry
TimestampTz stat_reset_timestamp;
} PgStat_StatReplSlotEntry;
+/* The lookup key for subscription worker hash table */
+typedef struct PgStat_StatSubWorkerKey
+{
+ Oid subid;
+
+ /*
+ * Oid of the table for which tablesync worker will copy the initial data.
+ * An InvalidOid will be assigned for apply workers.
+ */
+ Oid subrelid;
+} PgStat_StatSubWorkerKey;
+
+/*
+ * Logical replication apply worker and table sync worker statistics kept in the
+ * stats collector.
+ */
+typedef struct PgStat_StatSubWorkerEntry
+{
+ PgStat_StatSubWorkerKey key; /* hash key (must be first) */
+
+ /*
+ * Subscription worker error statistics representing an error that
+ * occurred during application of changes or the initial table
+ * synchronization.
+ */
+ Oid last_error_relid;
+ LogicalRepMsgType last_error_command;
+ TransactionId last_error_xid;
+ PgStat_Counter last_error_count;
+ TimestampTz last_error_time;
+ char last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
+} PgStat_StatSubWorkerEntry;
/*
* Working state needed to accumulate per-function-call timing statistics.
@@ -1019,7 +1111,8 @@ extern void pgstat_drop_database(Oid databaseid);
extern void pgstat_clear_snapshot(void);
extern void pgstat_reset_counters(void);
extern void pgstat_reset_shared_counters(const char *);
-extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
+extern void pgstat_reset_single_counter(Oid objectid, Oid subobjectid,
+ PgStat_Single_Reset_Type type);
extern void pgstat_reset_slru_counter(const char *);
extern void pgstat_reset_replslot_counter(const char *name);
@@ -1038,6 +1131,10 @@ extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
extern void pgstat_report_replslot_create(const char *slotname);
extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
+ LogicalRepMsgType command,
+ TransactionId xid, const char *errmsg);
+extern void pgstat_report_subscription_drop(Oid subid);
extern void pgstat_initialize(void);
@@ -1129,6 +1226,8 @@ extern void pgstat_send_wal(bool force);
extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid);
extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid);
extern PgStat_StatFuncEntry *pgstat_fetch_stat_funcentry(Oid funcid);
+extern PgStat_StatSubWorkerEntry *pgstat_fetch_stat_subworker_entry(Oid subid,
+ Oid subrelid);
extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_BgWriterStats *pgstat_fetch_stat_bgwriter(void);
extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2fa00a3c29..b58b062b10 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2094,6 +2094,24 @@ pg_stat_subscription| SELECT su.oid AS subid,
st.latest_end_time
FROM (pg_subscription su
LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+pg_stat_subscription_workers| SELECT w.subid,
+ s.subname,
+ w.subrelid,
+ w.last_error_relid,
+ w.last_error_command,
+ w.last_error_xid,
+ w.last_error_count,
+ w.last_error_message,
+ w.last_error_time
+ FROM ( SELECT pg_subscription.oid AS subid,
+ NULL::oid AS relid
+ FROM pg_subscription
+ UNION ALL
+ SELECT pg_subscription_rel.srsubid AS subid,
+ pg_subscription_rel.srrelid AS relid
+ FROM pg_subscription_rel) sr,
+ (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
+ JOIN pg_subscription s ON ((w.subid = s.oid)));
pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
pg_stat_all_indexes.indexrelid,
pg_stat_all_indexes.schemaname,
diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl
new file mode 100644
index 0000000000..e64e0a74b8
--- /dev/null
+++ b/src/test/subscription/t/026_worker_stats.pl
@@ -0,0 +1,154 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for subscription error stats.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 5;
+
+# Test if the error reported on pg_stat_subscription_workers view is expected.
+sub test_subscription_error
+{
+ my ($node, $relname, $xid, $expected_error, $msg) = @_;
+
+ my $check_sql = qq[
+SELECT count(1) > 0 FROM pg_stat_subscription_workers
+WHERE last_error_relid = '$relname'::regclass];
+ $check_sql .= " AND last_error_xid = '$xid'::xid;" if $xid ne '';
+
+ # Wait for the error statistics to be updated.
+ $node->poll_query_until(
+ 'postgres', $check_sql,
+) or die "Timed out while waiting for statistics to be updated";
+
+ my $result = $node->safe_psql(
+ 'postgres',
+ qq[
+SELECT subname, last_error_command, last_error_relid::regclass, last_error_count > 0
+FROM pg_stat_subscription_workers
+WHERE last_error_relid = '$relname'::regclass;
+]);
+ is($result, $expected_error, $msg);
+}
+
+# Create publisher node.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node.
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+
+# The subscriber will enter an infinite error loop, so we don't want
+# to overflow the server log with error messages.
+$node_subscriber->append_conf('postgresql.conf',
+ qq[
+wal_retrieve_retry_interval = 2s
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On subscriber we
+# create the same tables but with primary keys. Also, insert some data that
+# will conflict with the data replicated from publisher later.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+CREATE TABLE test_tab1 (a int);
+CREATE TABLE test_tab2 (a int);
+INSERT INTO test_tab1 VALUES (1);
+INSERT INTO test_tab2 VALUES (1);
+COMMIT;
+]);
+$node_subscriber->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+CREATE TABLE test_tab1 (a int primary key);
+CREATE TABLE test_tab2 (a int primary key);
+INSERT INTO test_tab2 VALUES (1);
+COMMIT;
+]);
+
+# Setup publications.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql(
+ 'postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;");
+
+# There shouldn't be any subscription errors before starting logical replication.
+my $result = $node_subscriber->safe_psql(
+ 'postgres',
+ "SELECT count(1) FROM pg_stat_subscription_workers");
+is($result, qq(0), 'check no subscription error');
+
+# Create subscription. The table sync for test_tab2 on tap_sub will enter into
+# infinite error loop due to violating the unique constraint.
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = off);");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Wait for initial table sync for test_tab1 to finish.
+$node_subscriber->poll_query_until(
+ 'postgres',
+ qq[
+SELECT count(1) = 1 FROM pg_subscription_rel
+WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's')
+]) or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the initial data.
+$result = $node_subscriber->safe_psql(
+ 'postgres',
+ "SELECT count(a) FROM test_tab1");
+is($result, q(1), 'check initial data are copied to subscriber');
+
+# Insert more data to test_tab1, raising an error on the subscriber due to
+# violation of the unique constraint on test_tab1.
+my $xid = $node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (1);
+SELECT pg_current_xact_id()::xid;
+COMMIT;
+]);
+test_subscription_error($node_subscriber, 'test_tab1', $xid,
+ qq(tap_sub|INSERT|test_tab1|t),
+ 'check the error reported by the apply worker');
+
+# Check the table sync worker's error in the view.
+test_subscription_error($node_subscriber, 'test_tab2', '',
+ qq(tap_sub||test_tab2|t),
+ 'check the error reported by the table sync worker');
+
+# Test for resetting subscription worker statistics.
+# Truncate test_tab1 and test_tab2 so that applying changes and table sync can
+# continue, respectively.
+$node_subscriber->safe_psql(
+ 'postgres',
+ "TRUNCATE test_tab1, test_tab2;");
+
+# Wait for the data to be replicated.
+$node_subscriber->poll_query_until(
+ 'postgres',
+ "SELECT count(1) > 0 FROM test_tab1");
+$node_subscriber->poll_query_until(
+ 'postgres',
+ "SELECT count(1) > 0 FROM test_tab2");
+
+# There shouldn't be any errors in the view after dropping the subscription.
+$node_subscriber->safe_psql(
+ 'postgres',
+ "DROP SUBSCRIPTION tap_sub;");
+$result = $node_subscriber->safe_psql(
+ 'postgres',
+ "SELECT count(1) FROM pg_stat_subscription_workers");
+is($result, q(0), 'no error after dropping subscription');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index da6ac8ed83..f41ef0d2bc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1943,6 +1943,8 @@ PgStat_MsgResetsharedcounter
PgStat_MsgResetsinglecounter
PgStat_MsgResetslrucounter
PgStat_MsgSLRU
+PgStat_MsgSubscriptionPurge
+PgStat_MsgSubWorkerError
PgStat_MsgTabpurge
PgStat_MsgTabstat
PgStat_MsgTempFile
@@ -1954,6 +1956,8 @@ PgStat_Single_Reset_Type
PgStat_StatDBEntry
PgStat_StatFuncEntry
PgStat_StatReplSlotEntry
+PgStat_StatSubWorkerEntry
+PgStat_StatSubWorkerKey
PgStat_StatTabEntry
PgStat_SubXactStatus
PgStat_TableCounts