pg_upgrade: Parallelize retrieving loadable libraries.

This commit makes use of the new task framework in pg_upgrade to
parallelize retrieving the names of all libraries referenced by
non-built-in C functions.  This step will now process multiple
databases concurrently when pg_upgrade's --jobs option is provided
a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
This commit is contained in:
Nathan Bossart 2024-09-16 16:10:33 -05:00
parent 7baa36de58
commit 46cad8b319
1 changed files with 44 additions and 25 deletions

View File

@ -42,6 +42,30 @@ library_name_compare(const void *p1, const void *p2)
((const LibraryInfo *) p2)->dbnum);
}
/*
* Private state for get_loadable_libraries()'s UpgradeTask.
*/
struct loadable_libraries_state
{
PGresult **ress; /* results for each database */
int totaltups; /* number of tuples in all results */
};
/*
* Callback function for processing results of query for
* get_loadable_libraries()'s UpgradeTask. This function stores the results
* for later use within get_loadable_libraries().
*/
static void
process_loadable_libraries(DbInfo *dbinfo, PGresult *res, void *arg)
{
struct loadable_libraries_state *state = (struct loadable_libraries_state *) arg;
AssertVariableIsOfType(&process_loadable_libraries, UpgradeTaskProcessCB);
state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
state->totaltups += PQntuples(res);
}
/*
* get_loadable_libraries()
@ -54,47 +78,41 @@ library_name_compare(const void *p1, const void *p2)
void
get_loadable_libraries(void)
{
PGresult **ress;
int totaltups;
int dbnum;
int n_libinfos;
UpgradeTask *task = upgrade_task_create();
struct loadable_libraries_state state;
char *query;
ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
totaltups = 0;
state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *));
state.totaltups = 0;
/* Fetch all library names, removing duplicates within each DB */
for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
{
DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum];
PGconn *conn = connectToServer(&old_cluster, active_db->db_name);
query = psprintf("SELECT DISTINCT probin "
"FROM pg_catalog.pg_proc "
"WHERE prolang = %u AND "
"probin IS NOT NULL AND "
"oid >= %u",
ClanguageId,
FirstNormalObjectId);
/*
* Fetch all libraries containing non-built-in C functions in this DB.
*/
ress[dbnum] = executeQueryOrDie(conn,
"SELECT DISTINCT probin "
"FROM pg_catalog.pg_proc "
"WHERE prolang = %u AND "
"probin IS NOT NULL AND "
"oid >= %u;",
ClanguageId,
FirstNormalObjectId);
totaltups += PQntuples(ress[dbnum]);
upgrade_task_add_step(task, query, process_loadable_libraries,
false, &state);
PQfinish(conn);
}
upgrade_task_run(task, &old_cluster);
upgrade_task_free(task);
/*
* Allocate memory for required libraries and logical replication output
* plugins.
*/
n_libinfos = totaltups + count_old_cluster_logical_slots();
n_libinfos = state.totaltups + count_old_cluster_logical_slots();
os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos);
totaltups = 0;
for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
{
PGresult *res = ress[dbnum];
PGresult *res = state.ress[dbnum];
int ntups;
int rowno;
LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr;
@ -129,7 +147,8 @@ get_loadable_libraries(void)
}
}
pg_free(ress);
pg_free(state.ress);
pg_free(query);
os_info.num_libraries = totaltups;
}