diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 508ed218e8..47b2c87f7f 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -628,7 +628,8 @@ typedef struct
 	pg_time_usec_t txn_begin;	/* used for measuring schedule lag times */
 	pg_time_usec_t stmt_begin;	/* used for measuring statement latencies */
 
-	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
+	/* whether client prepared each command of each script */
+	bool	  **prepared;
 
 	/*
 	 * For processing failures and repeating transactions with serialization
@@ -733,7 +734,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
  * argv			Command arguments, the first of which is the command or SQL
  *				string itself.  For SQL commands, after post-processing
  *				argv[0] is the same as 'lines' with variables substituted.
- * varprefix 	SQL commands terminated with \gset or \aset have this set
+ * prepname		The name that this command is prepared under, in prepare mode
+ * varprefix	SQL commands terminated with \gset or \aset have this set
  *				to a non NULL value.  If nonempty, it's used to prefix the
  *				variable name that receives the value.
  * aset			do gset on all possible queries of a combined query (\;).
@@ -751,6 +753,7 @@ typedef struct Command
 	MetaCommand meta;
 	int			argc;
 	char	   *argv[MAX_ARGS];
+	char	   *prepname;
 	char	   *varprefix;
 	PgBenchExpr *expr;
 	SimpleStats stats;
@@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
 	return true;
 }
 
-#define MAX_PREPARE_NAME		32
-static void
-preparedStatementName(char *buffer, int file, int state)
-{
-	sprintf(buffer, "P%d_%d", file, state);
-}
-
 /*
  * Report the abortion of the client when processing SQL commands.
  */
@@ -3053,6 +3049,87 @@ chooseScript(TState *thread)
 	return i - 1;
 }
 
+/*
+ * Prepare the SQL command from st->use_file at command_num.
+ */
+static void
+prepareCommand(CState *st, int command_num)
+{
+	Command    *command = sql_script[st->use_file].commands[command_num];
+
+	/* No prepare for non-SQL commands */
+	if (command->type != SQL_COMMAND)
+		return;
+
+	/*
+	 * If not already done, allocate space for 'prepared' flags: one boolean
+	 * for each command of each script.
+	 */
+	if (!st->prepared)
+	{
+		st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
+		for (int i = 0; i < num_scripts; i++)
+		{
+			ParsedScript *script = &sql_script[i];
+			int			numcmds;
+
+			for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
+				;
+			st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds);
+		}
+	}
+
+	if (!st->prepared[st->use_file][command_num])
+	{
+		PGresult   *res;
+
+		pg_log_debug("client %d preparing %s", st->id, command->prepname);
+		res = PQprepare(st->con, command->prepname,
+						command->argv[0], command->argc - 1, NULL);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("%s", PQerrorMessage(st->con));
+		PQclear(res);
+		st->prepared[st->use_file][command_num] = true;
+	}
+}
+
+/*
+ * Prepare all the commands in the script that come after the \startpipeline
+ * that's at position st->command, and the first \endpipeline we find.
+ *
+ * This sets the ->prepared flag for each relevant command as well as the
+ * \startpipeline itself, but doesn't move the st->command counter.
+ */
+static void
+prepareCommandsInPipeline(CState *st)
+{
+	int			j;
+	Command   **commands = sql_script[st->use_file].commands;
+
+	Assert(commands[st->command]->type == META_COMMAND &&
+		   commands[st->command]->meta == META_STARTPIPELINE);
+
+	/*
+	 * We set the 'prepared' flag on the \startpipeline itself to flag that we
+	 * don't need to do this next time without calling prepareCommand(), even
+	 * though we don't actually prepare this command.
+	 */
+	if (st->prepared &&
+		st->prepared[st->use_file][st->command])
+		return;
+
+	for (j = st->command + 1; commands[j] != NULL; j++)
+	{
+		if (commands[j]->type == META_COMMAND &&
+			commands[j]->meta == META_ENDPIPELINE)
+			break;
+
+		prepareCommand(st, j);
+	}
+
+	st->prepared[st->use_file][st->command] = true;
+}
+
 /* Send a SQL command, using the chosen querymode */
 static bool
 sendCommand(CState *st, Command *command)
@@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command)
 	}
 	else if (querymode == QUERY_PREPARED)
 	{
-		char		name[MAX_PREPARE_NAME];
 		const char *params[MAX_ARGS];
 
-		if (!st->prepared[st->use_file])
-		{
-			int			j;
-			Command   **commands = sql_script[st->use_file].commands;
-
-			for (j = 0; commands[j] != NULL; j++)
-			{
-				PGresult   *res;
-
-				if (commands[j]->type != SQL_COMMAND)
-					continue;
-				preparedStatementName(name, st->use_file, j);
-				if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
-				{
-					res = PQprepare(st->con, name,
-									commands[j]->argv[0], commands[j]->argc - 1, NULL);
-					if (PQresultStatus(res) != PGRES_COMMAND_OK)
-						pg_log_error("%s", PQerrorMessage(st->con));
-					PQclear(res);
-				}
-				else
-				{
-					/*
-					 * In pipeline mode, we use asynchronous functions. If a
-					 * server-side error occurs, it will be processed later
-					 * among the other results.
-					 */
-					if (!PQsendPrepare(st->con, name,
-									   commands[j]->argv[0], commands[j]->argc - 1, NULL))
-						pg_log_error("%s", PQerrorMessage(st->con));
-				}
-			}
-			st->prepared[st->use_file] = true;
-		}
-
+		prepareCommand(st, st->command);
 		getQueryParams(&st->variables, command, params);
-		preparedStatementName(name, st->use_file, st->command);
 
-		pg_log_debug("client %d sending %s", st->id, name);
-		r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+		pg_log_debug("client %d sending %s", st->id, command->prepname);
+		r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
 								params, NULL, NULL, 0);
 	}
 	else						/* unknown sql mode */
@@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 					thread->conn_duration += now - start;
 
 					/* Reset session-local state */
-					memset(st->prepared, 0, sizeof(st->prepared));
+					pg_free(st->prepared);
+					st->prepared = NULL;
 				}
 
 				/*
@@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 			return CSTATE_ABORTED;
 		}
 
+		/*
+		 * If we're in prepared-query mode, we need to prepare all the
+		 * commands that are inside the pipeline before we actually start the
+		 * pipeline itself.  This solves the problem that running BEGIN
+		 * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
+		 * snapshot having been acquired by the prepare within the pipeline.
+		 */
+		if (querymode == QUERY_PREPARED)
+			prepareCommandsInPipeline(st);
+
 		if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
 		{
 			commandFailed(st, "startpipeline", "already in pipeline mode");
@@ -5439,6 +5491,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
 	my_command->varprefix = NULL;	/* allocated later, if needed */
 	my_command->expr = NULL;
 	initSimpleStats(&my_command->stats);
+	my_command->prepname = NULL;	/* set later, if needed */
 
 	return my_command;
 }
@@ -5468,6 +5521,7 @@ static void
 postprocess_sql_command(Command *my_command)
 {
 	char		buffer[128];
+	static int	prepnum = 0;
 
 	Assert(my_command->type == SQL_COMMAND);
 
@@ -5476,15 +5530,17 @@ postprocess_sql_command(Command *my_command)
 	buffer[strcspn(buffer, "\n\r")] = '\0';
 	my_command->first_line = pg_strdup(buffer);
 
-	/* parse query if necessary */
+	/* Parse query and generate prepared statement name, if necessary */
 	switch (querymode)
 	{
 		case QUERY_SIMPLE:
 			my_command->argv[0] = my_command->lines.data;
 			my_command->argc++;
 			break;
-		case QUERY_EXTENDED:
 		case QUERY_PREPARED:
+			my_command->prepname = psprintf("P_%d", prepnum++);
+			/* fall through */
+		case QUERY_EXTENDED:
 			if (!parseQuery(my_command))
 				exit(1);
 			break;
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 4bf508ea96..99273203f0 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -839,6 +839,26 @@ select 1 \gset f
 }
 	});
 
+# Working \startpipeline in prepared query mode with serializable
+$node->pgbench(
+	'-c4 -j2 -t 10 -n -M prepared',
+	0,
+	[
+		qr{type: .*/001_pgbench_pipeline_serializable},
+		qr{actually processed: (\d+)/\1}
+	],
+	[],
+	'working \startpipeline with serializable',
+	{
+		'001_pgbench_pipeline_serializable' => q{
+-- test startpipeline with serializable
+\startpipeline
+BEGIN ISOLATION LEVEL SERIALIZABLE;
+} . "select 1;\n" x 10 . q{
+END;
+\endpipeline
+}
+	});
 
 # trigger many expression errors
 my @errors = (