pg_upgrade: Retrieve subscription count more efficiently.
Presently, pg_upgrade obtains the number of subscriptions in the to-be-upgraded cluster by first querying pg_subscription in every database for the number of subscriptions in only that database. Then, in count_old_cluster_subscriptions(), it adds all the values collected in the first step. This is expensive, especially when there are many databases. Fortunately, there is a better way to retrieve the subscription count. Since pg_subscription is a shared catalog, we only need to connect to a single database and query it once. This commit modifies pg_upgrade to use that approach, which also allows us to trim several lines of code. In passing, move the call to get_db_subscription_count(), which has been renamed to get_subscription_count(), from get_db_rel_and_slot_infos() to the dedicated >= v17 section in check_and_dump_old_cluster(). We may be able to make similar improvements to get_old_cluster_logical_slot_infos(), but that is left as a future exercise. Reviewed-by: Michael Paquier, Amit Kapila Discussion: https://postgr.es/m/ZprQJv_TxccN3tkr%40nathan Backpatch-through: 17
This commit is contained in:
parent
9f21482fe1
commit
364509a2e7
@ -609,8 +609,10 @@ check_and_dump_old_cluster(bool live_check)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Subscriptions and their dependencies can be migrated since PG17.
|
* Subscriptions and their dependencies can be migrated since PG17.
|
||||||
* See comments atop get_db_subscription_count().
|
* Before that the logical slots are not upgraded, so we will not be
|
||||||
|
* able to upgrade the logical replication clusters completely.
|
||||||
*/
|
*/
|
||||||
|
get_subscription_count(&old_cluster);
|
||||||
check_old_cluster_subscription_state();
|
check_old_cluster_subscription_state();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1797,17 +1799,14 @@ check_new_cluster_subscription_configuration(void)
|
|||||||
{
|
{
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
int nsubs_on_old;
|
|
||||||
int max_replication_slots;
|
int max_replication_slots;
|
||||||
|
|
||||||
/* Subscriptions and their dependencies can be migrated since PG17. */
|
/* Subscriptions and their dependencies can be migrated since PG17. */
|
||||||
if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
|
if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
nsubs_on_old = count_old_cluster_subscriptions();
|
|
||||||
|
|
||||||
/* Quick return if there are no subscriptions to be migrated. */
|
/* Quick return if there are no subscriptions to be migrated. */
|
||||||
if (nsubs_on_old == 0)
|
if (old_cluster.nsubs == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
prep_status("Checking for new cluster configuration for subscriptions");
|
prep_status("Checking for new cluster configuration for subscriptions");
|
||||||
@ -1821,10 +1820,10 @@ check_new_cluster_subscription_configuration(void)
|
|||||||
pg_fatal("could not determine parameter settings on new cluster");
|
pg_fatal("could not determine parameter settings on new cluster");
|
||||||
|
|
||||||
max_replication_slots = atoi(PQgetvalue(res, 0, 0));
|
max_replication_slots = atoi(PQgetvalue(res, 0, 0));
|
||||||
if (nsubs_on_old > max_replication_slots)
|
if (old_cluster.nsubs > max_replication_slots)
|
||||||
pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of "
|
pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of "
|
||||||
"subscriptions (%d) on the old cluster",
|
"subscriptions (%d) on the old cluster",
|
||||||
max_replication_slots, nsubs_on_old);
|
max_replication_slots, old_cluster.nsubs);
|
||||||
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
|
@ -28,7 +28,6 @@ static void print_db_infos(DbInfoArr *db_arr);
|
|||||||
static void print_rel_infos(RelInfoArr *rel_arr);
|
static void print_rel_infos(RelInfoArr *rel_arr);
|
||||||
static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
|
static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
|
||||||
static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
|
static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
|
||||||
static void get_db_subscription_count(DbInfo *dbinfo);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -293,15 +292,8 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
|
|||||||
|
|
||||||
get_rel_infos(cluster, pDbInfo);
|
get_rel_infos(cluster, pDbInfo);
|
||||||
|
|
||||||
/*
|
|
||||||
* Retrieve the logical replication slots infos and the subscriptions
|
|
||||||
* count for the old cluster.
|
|
||||||
*/
|
|
||||||
if (cluster == &old_cluster)
|
if (cluster == &old_cluster)
|
||||||
{
|
|
||||||
get_old_cluster_logical_slot_infos(pDbInfo, live_check);
|
get_old_cluster_logical_slot_infos(pDbInfo, live_check);
|
||||||
get_db_subscription_count(pDbInfo);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cluster == &old_cluster)
|
if (cluster == &old_cluster)
|
||||||
@ -748,54 +740,25 @@ count_old_cluster_logical_slots(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* get_db_subscription_count()
|
* get_subscription_count()
|
||||||
*
|
*
|
||||||
* Gets the number of subscriptions in the database referred to by "dbinfo".
|
* Gets the number of subscriptions in the cluster.
|
||||||
*
|
|
||||||
* Note: This function will not do anything if the old cluster is pre-PG17.
|
|
||||||
* This is because before that the logical slots are not upgraded, so we will
|
|
||||||
* not be able to upgrade the logical replication clusters completely.
|
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
get_db_subscription_count(DbInfo *dbinfo)
|
get_subscription_count(ClusterInfo *cluster)
|
||||||
{
|
{
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
|
|
||||||
/* Subscriptions can be migrated since PG17. */
|
conn = connectToServer(cluster, "template1");
|
||||||
if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
|
|
||||||
return;
|
|
||||||
|
|
||||||
conn = connectToServer(&old_cluster, dbinfo->db_name);
|
|
||||||
res = executeQueryOrDie(conn, "SELECT count(*) "
|
res = executeQueryOrDie(conn, "SELECT count(*) "
|
||||||
"FROM pg_catalog.pg_subscription WHERE subdbid = %u",
|
"FROM pg_catalog.pg_subscription");
|
||||||
dbinfo->db_oid);
|
cluster->nsubs = atoi(PQgetvalue(res, 0, 0));
|
||||||
dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0));
|
|
||||||
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
PQfinish(conn);
|
PQfinish(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* count_old_cluster_subscriptions()
|
|
||||||
*
|
|
||||||
* Returns the number of subscriptions for all databases.
|
|
||||||
*
|
|
||||||
* Note: this function always returns 0 if the old_cluster is PG16 and prior
|
|
||||||
* because we gather subscriptions only for cluster versions greater than or
|
|
||||||
* equal to PG17. See get_db_subscription_count().
|
|
||||||
*/
|
|
||||||
int
|
|
||||||
count_old_cluster_subscriptions(void)
|
|
||||||
{
|
|
||||||
int nsubs = 0;
|
|
||||||
|
|
||||||
for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
|
|
||||||
nsubs += old_cluster.dbarr.dbs[dbnum].nsubs;
|
|
||||||
|
|
||||||
return nsubs;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
free_db_and_rel_infos(DbInfoArr *db_arr)
|
free_db_and_rel_infos(DbInfoArr *db_arr)
|
||||||
{
|
{
|
||||||
|
@ -197,7 +197,6 @@ typedef struct
|
|||||||
* path */
|
* path */
|
||||||
RelInfoArr rel_arr; /* array of all user relinfos */
|
RelInfoArr rel_arr; /* array of all user relinfos */
|
||||||
LogicalSlotInfoArr slot_arr; /* array of all LogicalSlotInfo */
|
LogicalSlotInfoArr slot_arr; /* array of all LogicalSlotInfo */
|
||||||
int nsubs; /* number of subscriptions */
|
|
||||||
} DbInfo;
|
} DbInfo;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -296,6 +295,7 @@ typedef struct
|
|||||||
char major_version_str[64]; /* string PG_VERSION of cluster */
|
char major_version_str[64]; /* string PG_VERSION of cluster */
|
||||||
uint32 bin_version; /* version returned from pg_ctl */
|
uint32 bin_version; /* version returned from pg_ctl */
|
||||||
const char *tablespace_suffix; /* directory specification */
|
const char *tablespace_suffix; /* directory specification */
|
||||||
|
int nsubs; /* number of subscriptions */
|
||||||
} ClusterInfo;
|
} ClusterInfo;
|
||||||
|
|
||||||
|
|
||||||
@ -430,7 +430,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
|
|||||||
const char *new_pgdata);
|
const char *new_pgdata);
|
||||||
void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check);
|
void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check);
|
||||||
int count_old_cluster_logical_slots(void);
|
int count_old_cluster_logical_slots(void);
|
||||||
int count_old_cluster_subscriptions(void);
|
void get_subscription_count(ClusterInfo *cluster);
|
||||||
|
|
||||||
/* option.c */
|
/* option.c */
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user