pg_upgrade: Parallelize contrib/isn check.

This commit makes use of the new task framework in pg_upgrade to
parallelize the check for contrib/isn functions that rely on the
bigint data type.  This step will now process multiple databases
concurrently when pg_upgrade's --jobs option is provided a value
greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
This commit is contained in:
Nathan Bossart 2024-09-16 16:10:33 -05:00
parent bbf83cab98
commit 9db3018cf8
1 changed files with 51 additions and 48 deletions

View File

@ -1225,6 +1225,39 @@ check_for_prepared_transactions(ClusterInfo *cluster)
check_ok();
}
/*
* Callback function for processing result of query for
* check_for_isn_and_int8_passing_mismatch()'s UpgradeTask. If the query
* returned any rows (i.e., the check failed), write the details to the report
* file.
*/
static void
process_isn_and_int8_passing_mismatch(DbInfo *dbinfo, PGresult *res, void *arg)
{
bool db_used = false;
int ntups = PQntuples(res);
int i_nspname = PQfnumber(res, "nspname");
int i_proname = PQfnumber(res, "proname");
UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
AssertVariableIsOfType(&process_isn_and_int8_passing_mismatch,
UpgradeTaskProcessCB);
for (int rowno = 0; rowno < ntups; rowno++)
{
if (report->file == NULL &&
(report->file = fopen_priv(report->path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", report->path);
if (!db_used)
{
fprintf(report->file, "In database: %s\n", dbinfo->db_name);
db_used = true;
}
fprintf(report->file, " %s.%s\n",
PQgetvalue(res, rowno, i_nspname),
PQgetvalue(res, rowno, i_proname));
}
}
/*
* check_for_isn_and_int8_passing_mismatch()
@ -1236,9 +1269,13 @@ check_for_prepared_transactions(ClusterInfo *cluster)
static void
check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
{
int dbnum;
FILE *script = NULL;
char output_path[MAXPGPATH];
UpgradeTask *task;
UpgradeTaskReport report;
const char *query = "SELECT n.nspname, p.proname "
"FROM pg_catalog.pg_proc p, "
" pg_catalog.pg_namespace n "
"WHERE p.pronamespace = n.oid AND "
" p.probin = '$libdir/isn'";
prep_status("Checking for contrib/isn with bigint-passing mismatch");
@ -1250,54 +1287,20 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
return;
}
snprintf(output_path, sizeof(output_path), "%s/%s",
report.file = NULL;
snprintf(report.path, sizeof(report.path), "%s/%s",
log_opts.basedir,
"contrib_isn_and_int8_pass_by_value.txt");
for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
task = upgrade_task_create();
upgrade_task_add_step(task, query, process_isn_and_int8_passing_mismatch,
true, &report);
upgrade_task_run(task, cluster);
upgrade_task_free(task);
if (report.file)
{
PGresult *res;
bool db_used = false;
int ntups;
int rowno;
int i_nspname,
i_proname;
DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
PGconn *conn = connectToServer(cluster, active_db->db_name);
/* Find any functions coming from contrib/isn */
res = executeQueryOrDie(conn,
"SELECT n.nspname, p.proname "
"FROM pg_catalog.pg_proc p, "
" pg_catalog.pg_namespace n "
"WHERE p.pronamespace = n.oid AND "
" p.probin = '$libdir/isn'");
ntups = PQntuples(res);
i_nspname = PQfnumber(res, "nspname");
i_proname = PQfnumber(res, "proname");
for (rowno = 0; rowno < ntups; rowno++)
{
if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
pg_fatal("could not open file \"%s\": %m", output_path);
if (!db_used)
{
fprintf(script, "In database: %s\n", active_db->db_name);
db_used = true;
}
fprintf(script, " %s.%s\n",
PQgetvalue(res, rowno, i_nspname),
PQgetvalue(res, rowno, i_proname));
}
PQclear(res);
PQfinish(conn);
}
if (script)
{
fclose(script);
fclose(report.file);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains \"contrib/isn\" functions which rely on the\n"
"bigint data type. Your old and new clusters pass bigint values\n"
@ -1305,7 +1308,7 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
"manually dump databases in the old cluster that use \"contrib/isn\"\n"
"facilities, drop them, perform the upgrade, and then restore them. A\n"
"list of the problem functions is in the file:\n"
" %s", output_path);
" %s", report.path);
}
else
check_ok();