From cf2f82a37cc35895b67c83dd2b33d2fcf4688a55 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Mon, 16 Sep 2024 16:10:33 -0500
Subject: [PATCH] pg_upgrade: Parallelize incompatible polymorphics check.

This commit makes use of the new task framework in pg_upgrade to
parallelize the check for usage of incompatible polymorphic
functions, i.e., those with arguments of type anyarray/anyelement
rather than the newer anycompatible variants.  This step will now
process multiple databases concurrently when pg_upgrade's --jobs
option is provided a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/check.c | 163 +++++++++++++++++++------------------
 1 file changed, 85 insertions(+), 78 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 28c4ddbca3..c5fa1a5bed 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1413,6 +1413,40 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
 		check_ok();
 }
 
+/*
+ * Callback function for processing results of query for
+ * check_for_incompatible_polymorphics()'s UpgradeTask.  If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_incompat_polymorphics(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+	UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+	bool		db_used = false;
+	int			ntups = PQntuples(res);
+	int			i_objkind = PQfnumber(res, "objkind");
+	int			i_objname = PQfnumber(res, "objname");
+
+	AssertVariableIsOfType(&process_incompat_polymorphics,
+						   UpgradeTaskProcessCB);
+
+	for (int rowno = 0; rowno < ntups; rowno++)
+	{
+		if (report->file == NULL &&
+			(report->file = fopen_priv(report->path, "w")) == NULL)
+			pg_fatal("could not open file \"%s\": %m", report->path);
+		if (!db_used)
+		{
+			fprintf(report->file, "In database: %s\n", dbinfo->db_name);
+			db_used = true;
+		}
+
+		fprintf(report->file, "  %s: %s\n",
+				PQgetvalue(res, rowno, i_objkind),
+				PQgetvalue(res, rowno, i_objname));
+	}
+}
+
 /*
  *	check_for_incompatible_polymorphics()
  *
@@ -1422,14 +1456,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
 static void
 check_for_incompatible_polymorphics(ClusterInfo *cluster)
 {
-	PGresult   *res;
-	FILE	   *script = NULL;
-	char		output_path[MAXPGPATH];
 	PQExpBufferData old_polymorphics;
+	UpgradeTask *task = upgrade_task_create();
+	UpgradeTaskReport report;
+	char	   *query;
 
 	prep_status("Checking for incompatible polymorphic functions");
 
-	snprintf(output_path, sizeof(output_path), "%s/%s",
+	report.file = NULL;
+	snprintf(report.path, sizeof(report.path), "%s/%s",
 			 log_opts.basedir,
 			 "incompatible_polymorphics.txt");
 
@@ -1453,80 +1488,51 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
 							 ", 'array_positions(anyarray,anyelement)'"
 							 ", 'width_bucket(anyelement,anyarray)'");
 
-	for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	/*
+	 * The query below hardcodes FirstNormalObjectId as 16384 rather than
+	 * interpolating that C #define into the query because, if that #define is
+	 * ever changed, the cutoff we want to use is the value used by
+	 * pre-version 14 servers, not that of some future version.
+	 */
+
+	/* Aggregate transition functions */
+	query = psprintf("SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
+					 "FROM pg_proc AS p "
+					 "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
+					 "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
+					 "WHERE p.oid >= 16384 "
+					 "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
+					 "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+	/* Aggregate final functions */
+					 "UNION ALL "
+					 "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
+					 "FROM pg_proc AS p "
+					 "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
+					 "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
+					 "WHERE p.oid >= 16384 "
+					 "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
+					 "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+	/* Operators */
+					 "UNION ALL "
+					 "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
+					 "FROM pg_operator AS op "
+					 "WHERE op.oid >= 16384 "
+					 "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
+					 "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[])",
+					 old_polymorphics.data,
+					 old_polymorphics.data,
+					 old_polymorphics.data);
+
+	upgrade_task_add_step(task, query, process_incompat_polymorphics,
+						  true, &report);
+	upgrade_task_run(task, cluster);
+	upgrade_task_free(task);
+
+	if (report.file)
 	{
-		bool		db_used = false;
-		DbInfo	   *active_db = &cluster->dbarr.dbs[dbnum];
-		PGconn	   *conn = connectToServer(cluster, active_db->db_name);
-		int			ntups;
-		int			i_objkind,
-					i_objname;
-
-		/*
-		 * The query below hardcodes FirstNormalObjectId as 16384 rather than
-		 * interpolating that C #define into the query because, if that
-		 * #define is ever changed, the cutoff we want to use is the value
-		 * used by pre-version 14 servers, not that of some future version.
-		 */
-		res = executeQueryOrDie(conn,
-		/* Aggregate transition functions */
-								"SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-								"FROM pg_proc AS p "
-								"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
-								"JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
-								"WHERE p.oid >= 16384 "
-								"AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
-								"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-		/* Aggregate final functions */
-								"UNION ALL "
-								"SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-								"FROM pg_proc AS p "
-								"JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
-								"JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
-								"WHERE p.oid >= 16384 "
-								"AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
-								"AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-		/* Operators */
-								"UNION ALL "
-								"SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
-								"FROM pg_operator AS op "
-								"WHERE op.oid >= 16384 "
-								"AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
-								"AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
-								old_polymorphics.data,
-								old_polymorphics.data,
-								old_polymorphics.data);
-
-		ntups = PQntuples(res);
-
-		i_objkind = PQfnumber(res, "objkind");
-		i_objname = PQfnumber(res, "objname");
-
-		for (int rowno = 0; rowno < ntups; rowno++)
-		{
-			if (script == NULL &&
-				(script = fopen_priv(output_path, "w")) == NULL)
-				pg_fatal("could not open file \"%s\": %m", output_path);
-			if (!db_used)
-			{
-				fprintf(script, "In database: %s\n", active_db->db_name);
-				db_used = true;
-			}
-
-			fprintf(script, "  %s: %s\n",
-					PQgetvalue(res, rowno, i_objkind),
-					PQgetvalue(res, rowno, i_objname));
-		}
-
-		PQclear(res);
-		PQfinish(conn);
-	}
-
-	if (script)
-	{
-		fclose(script);
+		fclose(report.file);
 		pg_log(PG_REPORT, "fatal");
 		pg_fatal("Your installation contains user-defined objects that refer to internal\n"
 				 "polymorphic functions with arguments of type \"anyarray\" or \"anyelement\".\n"
@@ -1534,12 +1540,13 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
 				 "afterwards, changing them to refer to the new corresponding functions with\n"
 				 "arguments of type \"anycompatiblearray\" and \"anycompatible\".\n"
 				 "A list of the problematic objects is in the file:\n"
-				 "    %s", output_path);
+				 "    %s", report.path);
 	}
 	else
 		check_ok();
 
 	termPQExpBuffer(&old_polymorphics);
+	pg_free(query);
 }
 
 /*