pg_upgrade: Parallelize retrieving extension updates.

This commit makes use of the new task framework in pg_upgrade to
parallelize retrieving the set of extensions that should be updated
with the ALTER EXTENSION command after upgrade.  This step will now
process multiple databases concurrently when pg_upgrade's --jobs
option is provided a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
This commit is contained in:
Nathan Bossart 2024-09-16 16:10:33 -05:00
parent 46cad8b319
commit 6ab8f27bc7
1 changed files with 52 additions and 48 deletions

View File

@ -139,6 +139,41 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode)
check_ok();
}
/*
* Callback function for processing results of query for
* report_extension_updates()'s UpgradeTask. If the query returned any rows,
* write the details to the report file.
*/
static void
process_extension_updates(DbInfo *dbinfo, PGresult *res, void *arg)
{
bool db_used = false;
int ntups = PQntuples(res);
int i_name = PQfnumber(res, "name");
UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
AssertVariableIsOfType(&process_extension_updates, UpgradeTaskProcessCB);
for (int rowno = 0; rowno < ntups; rowno++)
{
if (report->file == NULL &&
(report->file = fopen_priv(report->path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", report->path);
if (!db_used)
{
PQExpBufferData connectbuf;
initPQExpBuffer(&connectbuf);
appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
fputs(connectbuf.data, report->file);
termPQExpBuffer(&connectbuf);
db_used = true;
}
fprintf(report->file, "ALTER EXTENSION %s UPDATE;\n",
quote_identifier(PQgetvalue(res, rowno, i_name)));
}
}
/*
* report_extension_updates()
* Report extensions that should be updated.
@ -146,57 +181,26 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode)
void
report_extension_updates(ClusterInfo *cluster)
{
int dbnum;
FILE *script = NULL;
char *output_path = "update_extensions.sql";
UpgradeTaskReport report;
UpgradeTask *task = upgrade_task_create();
const char *query = "SELECT name "
"FROM pg_available_extensions "
"WHERE installed_version != default_version";
prep_status("Checking for extension updates");
for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
report.file = NULL;
strcpy(report.path, "update_extensions.sql");
upgrade_task_add_step(task, query, process_extension_updates,
true, &report);
upgrade_task_run(task, cluster);
upgrade_task_free(task);
if (report.file)
{
PGresult *res;
bool db_used = false;
int ntups;
int rowno;
int i_name;
DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
PGconn *conn = connectToServer(cluster, active_db->db_name);
/* find extensions needing updates */
res = executeQueryOrDie(conn,
"SELECT name "
"FROM pg_available_extensions "
"WHERE installed_version != default_version"
);
ntups = PQntuples(res);
i_name = PQfnumber(res, "name");
for (rowno = 0; rowno < ntups; rowno++)
{
if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", output_path);
if (!db_used)
{
PQExpBufferData connectbuf;
initPQExpBuffer(&connectbuf);
appendPsqlMetaConnect(&connectbuf, active_db->db_name);
fputs(connectbuf.data, script);
termPQExpBuffer(&connectbuf);
db_used = true;
}
fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
quote_identifier(PQgetvalue(res, rowno, i_name)));
}
PQclear(res);
PQfinish(conn);
}
if (script)
{
fclose(script);
fclose(report.file);
report_status(PG_REPORT, "notice");
pg_log(PG_REPORT, "\n"
"Your installation contains extensions that should be updated\n"
@ -204,7 +208,7 @@ report_extension_updates(ClusterInfo *cluster)
" %s\n"
"when executed by psql by the database superuser will update\n"
"these extensions.",
output_path);
report.path);
}
else
check_ok();