diff --git a/doc/src/sgml/spi.sgml b/doc/src/sgml/spi.sgml
index d6ff492ba9..d710e2d0df 100644
--- a/doc/src/sgml/spi.sgml
+++ b/doc/src/sgml/spi.sgml
@@ -739,6 +739,17 @@ int SPI_execute_extended(const char *<parameter>command</parameter>,
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>bool <parameter>must_return_tuples</parameter></literal></term>
+    <listitem>
+     <para>
+      if <literal>true</literal>, raise error if the query is not of a kind
+      that returns tuples (this does not forbid the case where it happens to
+      return zero tuples)
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>uint64 <parameter>tcount</parameter></literal></term>
     <listitem>
@@ -1869,6 +1880,17 @@ int SPI_execute_plan_extended(SPIPlanPtr <parameter>plan</parameter>,
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>bool <parameter>must_return_tuples</parameter></literal></term>
+    <listitem>
+     <para>
+      if <literal>true</literal>, raise error if the query is not of a kind
+      that returns tuples (this does not forbid the case where it happens to
+      return zero tuples)
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>uint64 <parameter>tcount</parameter></literal></term>
     <listitem>
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index a5aec7ba7d..0568ae123f 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -64,12 +64,9 @@ static void _SPI_prepare_plan(const char *src, SPIPlanPtr plan);
 
 static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 
-static int	_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
+static int	_SPI_execute_plan(SPIPlanPtr plan, const SPIExecuteOptions *options,
 							  Snapshot snapshot, Snapshot crosscheck_snapshot,
-							  bool read_only, bool allow_nonatomic,
-							  bool fire_triggers, uint64 tcount,
-							  DestReceiver *caller_dest,
-							  ResourceOwner plan_owner);
+							  bool fire_triggers);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
 										 Datum *Values, const char *Nulls);
@@ -504,6 +501,7 @@ int
 SPI_execute(const char *src, bool read_only, long tcount)
 {
 	_SPI_plan	plan;
+	SPIExecuteOptions options;
 	int			res;
 
 	if (src == NULL || tcount < 0)
@@ -520,11 +518,13 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
 	_SPI_prepare_oneshot_plan(src, &plan);
 
-	res = _SPI_execute_plan(&plan, NULL,
+	memset(&options, 0, sizeof(options));
+	options.read_only = read_only;
+	options.tcount = tcount;
+
+	res = _SPI_execute_plan(&plan, &options,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, false,
-							true, tcount,
-							NULL, NULL);
+							true);
 
 	_SPI_end_call(true);
 	return res;
@@ -564,11 +564,9 @@ SPI_execute_extended(const char *src,
 
 	_SPI_prepare_oneshot_plan(src, &plan);
 
-	res = _SPI_execute_plan(&plan, options->params,
+	res = _SPI_execute_plan(&plan, options,
 							InvalidSnapshot, InvalidSnapshot,
-							options->read_only, options->allow_nonatomic,
-							true, options->tcount,
-							options->dest, options->owner);
+							true);
 
 	_SPI_end_call(true);
 	return res;
@@ -579,6 +577,7 @@ int
 SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 				 bool read_only, long tcount)
 {
+	SPIExecuteOptions options;
 	int			res;
 
 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
@@ -591,13 +590,15 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 	if (res < 0)
 		return res;
 
-	res = _SPI_execute_plan(plan,
-							_SPI_convert_params(plan->nargs, plan->argtypes,
-												Values, Nulls),
+	memset(&options, 0, sizeof(options));
+	options.params = _SPI_convert_params(plan->nargs, plan->argtypes,
+										 Values, Nulls);
+	options.read_only = read_only;
+	options.tcount = tcount;
+
+	res = _SPI_execute_plan(plan, &options,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, false,
-							true, tcount,
-							NULL, NULL);
+							true);
 
 	_SPI_end_call(true);
 	return res;
@@ -624,11 +625,9 @@ SPI_execute_plan_extended(SPIPlanPtr plan,
 	if (res < 0)
 		return res;
 
-	res = _SPI_execute_plan(plan, options->params,
+	res = _SPI_execute_plan(plan, options,
 							InvalidSnapshot, InvalidSnapshot,
-							options->read_only, options->allow_nonatomic,
-							true, options->tcount,
-							options->dest, options->owner);
+							true);
 
 	_SPI_end_call(true);
 	return res;
@@ -639,6 +638,7 @@ int
 SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 								bool read_only, long tcount)
 {
+	SPIExecuteOptions options;
 	int			res;
 
 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
@@ -648,11 +648,14 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 	if (res < 0)
 		return res;
 
-	res = _SPI_execute_plan(plan, params,
+	memset(&options, 0, sizeof(options));
+	options.params = params;
+	options.read_only = read_only;
+	options.tcount = tcount;
+
+	res = _SPI_execute_plan(plan, &options,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, false,
-							true, tcount,
-							NULL, NULL);
+							true);
 
 	_SPI_end_call(true);
 	return res;
@@ -677,6 +680,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
 					 Snapshot snapshot, Snapshot crosscheck_snapshot,
 					 bool read_only, bool fire_triggers, long tcount)
 {
+	SPIExecuteOptions options;
 	int			res;
 
 	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
@@ -689,13 +693,15 @@ SPI_execute_snapshot(SPIPlanPtr plan,
 	if (res < 0)
 		return res;
 
-	res = _SPI_execute_plan(plan,
-							_SPI_convert_params(plan->nargs, plan->argtypes,
-												Values, Nulls),
+	memset(&options, 0, sizeof(options));
+	options.params = _SPI_convert_params(plan->nargs, plan->argtypes,
+										 Values, Nulls);
+	options.read_only = read_only;
+	options.tcount = tcount;
+
+	res = _SPI_execute_plan(plan, &options,
 							snapshot, crosscheck_snapshot,
-							read_only, false,
-							fire_triggers, tcount,
-							NULL, NULL);
+							fire_triggers);
 
 	_SPI_end_call(true);
 	return res;
@@ -716,6 +722,7 @@ SPI_execute_with_args(const char *src,
 	int			res;
 	_SPI_plan	plan;
 	ParamListInfo paramLI;
+	SPIExecuteOptions options;
 
 	if (src == NULL || nargs < 0 || tcount < 0)
 		return SPI_ERROR_ARGUMENT;
@@ -741,11 +748,14 @@ SPI_execute_with_args(const char *src,
 
 	_SPI_prepare_oneshot_plan(src, &plan);
 
-	res = _SPI_execute_plan(&plan, paramLI,
+	memset(&options, 0, sizeof(options));
+	options.params = paramLI;
+	options.read_only = read_only;
+	options.tcount = tcount;
+
+	res = _SPI_execute_plan(&plan, &options,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, false,
-							true, tcount,
-							NULL, NULL);
+							true);
 
 	_SPI_end_call(true);
 	return res;
@@ -2263,32 +2273,36 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
 }
 
 /*
- * Execute the given plan with the given parameter values
+ * _SPI_execute_plan: execute the given plan with the given options
  *
+ * options contains options accessible from outside SPI:
+ * params: parameter values to pass to query
+ * read_only: true for read-only execution (no CommandCounterIncrement)
+ * allow_nonatomic: true to allow nonatomic CALL/DO execution
+ * must_return_tuples: throw error if query doesn't return tuples
+ * tcount: execution tuple-count limit, or 0 for none
+ * dest: DestReceiver to receive output, or NULL for normal SPI output
+ * owner: ResourceOwner that will be used to hold refcount on plan;
+ *		if NULL, CurrentResourceOwner is used (ignored for non-saved plan)
+ *
+ * Additional, only-internally-accessible options:
  * snapshot: query snapshot to use, or InvalidSnapshot for the normal
  *		behavior of taking a new snapshot for each query.
  * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
- * read_only: true for read-only execution (no CommandCounterIncrement)
- * allow_nonatomic: true to allow nonatomic CALL/DO execution
  * fire_triggers: true to fire AFTER triggers at end of query (normal case);
  *		false means any AFTER triggers are postponed to end of outer query
- * tcount: execution tuple-count limit, or 0 for none
- * caller_dest: DestReceiver to receive output, or NULL for normal SPI output
- * plan_owner: ResourceOwner that will be used to hold refcount on plan;
- *		if NULL, CurrentResourceOwner is used (ignored for non-saved plan)
  */
 static int
-_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
+_SPI_execute_plan(SPIPlanPtr plan, const SPIExecuteOptions *options,
 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
-				  bool read_only, bool allow_nonatomic,
-				  bool fire_triggers, uint64 tcount,
-				  DestReceiver *caller_dest, ResourceOwner plan_owner)
+				  bool fire_triggers)
 {
 	int			my_res = 0;
 	uint64		my_processed = 0;
 	SPITupleTable *my_tuptable = NULL;
 	int			res = 0;
 	bool		pushed_active_snap = false;
+	ResourceOwner plan_owner = options->owner;
 	SPICallbackArg spicallbackarg;
 	ErrorContextCallback spierrcontext;
 	CachedPlan *cplan = NULL;
@@ -2328,8 +2342,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 	 */
 	if (snapshot != InvalidSnapshot)
 	{
-		Assert(!allow_nonatomic);
-		if (read_only)
+		Assert(!options->allow_nonatomic);
+		if (options->read_only)
 		{
 			PushActiveSnapshot(snapshot);
 			pushed_active_snap = true;
@@ -2351,6 +2365,17 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 	else if (plan_owner == NULL)
 		plan_owner = CurrentResourceOwner;
 
+	/*
+	 * We interpret must_return_tuples as "there must be at least one query,
+	 * and all of them must return tuples".  This is a bit laxer than
+	 * SPI_is_cursor_plan's check, but there seems no reason to enforce that
+	 * there be only one query.
+	 */
+	if (options->must_return_tuples && plan->plancache_list == NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("empty query does not return tuples")));
+
 	foreach(lc1, plan->plancache_list)
 	{
 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc1);
@@ -2404,11 +2429,33 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 							   false);	/* not fixed result */
 		}
 
+		/*
+		 * If asked to, complain when query does not return tuples.
+		 * (Replanning can't change this, so we can check it before that.
+		 * However, we can't check it till after parse analysis, so in the
+		 * case of a one-shot plan this is the earliest we could check.)
+		 */
+		if (options->must_return_tuples && !plansource->resultDesc)
+		{
+			/* try to give a good error message */
+			const char *cmdtag;
+
+			/* A SELECT without resultDesc must be SELECT INTO */
+			if (plansource->commandTag == CMDTAG_SELECT)
+				cmdtag = "SELECT INTO";
+			else
+				cmdtag = GetCommandTagName(plansource->commandTag);
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+			/* translator: %s is name of a SQL command, eg INSERT */
+					 errmsg("%s query does not return tuples", cmdtag)));
+		}
+
 		/*
 		 * Replan if needed, and increment plan refcount.  If it's a saved
 		 * plan, the refcount must be backed by the plan_owner.
 		 */
-		cplan = GetCachedPlan(plansource, paramLI,
+		cplan = GetCachedPlan(plansource, options->params,
 							  plan_owner, _SPI_current->queryEnv);
 
 		stmt_list = cplan->stmt_list;
@@ -2440,7 +2487,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 			 * Skip it when doing non-atomic execution, though (we rely
 			 * entirely on the Portal snapshot in that case).
 			 */
-			if (!read_only && !allow_nonatomic)
+			if (!options->read_only && !options->allow_nonatomic)
 			{
 				if (pushed_active_snap)
 					PopActiveSnapshot();
@@ -2484,7 +2531,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				}
 			}
 
-			if (read_only && !CommandIsReadOnly(stmt))
+			if (options->read_only && !CommandIsReadOnly(stmt))
 				ereport(ERROR,
 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				/* translator: %s is a SQL statement name */
@@ -2496,7 +2543,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 			 * command and update the snapshot.  (But skip it if the snapshot
 			 * isn't under our control.)
 			 */
-			if (!read_only && pushed_active_snap)
+			if (!options->read_only && pushed_active_snap)
 			{
 				CommandCounterIncrement();
 				UpdateActiveSnapshotCommandId();
@@ -2508,8 +2555,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 			 */
 			if (!canSetTag)
 				dest = CreateDestReceiver(DestNone);
-			else if (caller_dest)
-				dest = caller_dest;
+			else if (options->dest)
+				dest = options->dest;
 			else
 				dest = CreateDestReceiver(DestSPI);
 
@@ -2527,10 +2574,11 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 										plansource->query_string,
 										snap, crosscheck_snapshot,
 										dest,
-										paramLI, _SPI_current->queryEnv,
+										options->params,
+										_SPI_current->queryEnv,
 										0);
 				res = _SPI_pquery(qdesc, fire_triggers,
-								  canSetTag ? tcount : 0);
+								  canSetTag ? options->tcount : 0);
 				FreeQueryDesc(qdesc);
 			}
 			else
@@ -2543,7 +2591,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				 * nonatomic operations, tell ProcessUtility this is an atomic
 				 * execution context.
 				 */
-				if (_SPI_current->atomic || !allow_nonatomic)
+				if (_SPI_current->atomic || !options->allow_nonatomic)
 					context = PROCESS_UTILITY_QUERY;
 				else
 					context = PROCESS_UTILITY_QUERY_NONATOMIC;
@@ -2553,7 +2601,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 							   plansource->query_string,
 							   true,	/* protect plancache's node tree */
 							   context,
-							   paramLI,
+							   options->params,
 							   _SPI_current->queryEnv,
 							   dest,
 							   &qc);
@@ -2639,7 +2687,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 		 * command.  This ensures that its effects are visible, in case it was
 		 * DDL that would affect the next CachedPlanSource.
 		 */
-		if (!read_only)
+		if (!options->read_only)
 			CommandCounterIncrement();
 	}
 
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 74e2e9405f..1e66a7d2ea 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -48,6 +48,7 @@ typedef struct SPIExecuteOptions
 	ParamListInfo params;
 	bool		read_only;
 	bool		allow_nonatomic;
+	bool		must_return_tuples;
 	uint64		tcount;
 	DestReceiver *dest;
 	ResourceOwner owner;
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index 7c5bc63778..0e1cfa3df6 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -3553,26 +3553,13 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
 		memset(&options, 0, sizeof(options));
 		options.params = paramLI;
 		options.read_only = estate->readonly_func;
+		options.must_return_tuples = true;
 		options.dest = treceiver;
 
 		rc = SPI_execute_plan_extended(expr->plan, &options);
-		if (rc != SPI_OK_SELECT)
-		{
-			/*
-			 * SELECT INTO deserves a special error message, because "query is
-			 * not a SELECT" is not very helpful in that case.
-			 */
-			if (rc == SPI_OK_SELINTO)
-				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("query is SELECT INTO, but it should be plain SELECT"),
-						 errcontext("query: %s", expr->query)));
-			else
-				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("query is not a SELECT"),
-						 errcontext("query: %s", expr->query)));
-		}
+		if (rc < 0)
+			elog(ERROR, "SPI_execute_plan_extended failed executing query \"%s\": %s",
+				 expr->query, SPI_result_code_string(rc));
 	}
 	else
 	{
@@ -3609,6 +3596,7 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
 		options.params = exec_eval_using_params(estate,
 												stmt->params);
 		options.read_only = estate->readonly_func;
+		options.must_return_tuples = true;
 		options.dest = treceiver;
 
 		rc = SPI_execute_extended(querystr, &options);
diff --git a/src/test/regress/expected/plpgsql.out b/src/test/regress/expected/plpgsql.out
index 6ea169d9ad..278d056505 100644
--- a/src/test/regress/expected/plpgsql.out
+++ b/src/test/regress/expected/plpgsql.out
@@ -4227,7 +4227,7 @@ select * from tftest(10);
 (2 rows)
 
 drop function tftest(int);
-create or replace function rttest()
+create function rttest()
 returns setof int as $$
 declare rc int;
 begin
@@ -4258,6 +4258,31 @@ NOTICE:  f 0
      20
 (4 rows)
 
+-- check some error cases, too
+create or replace function rttest()
+returns setof int as $$
+begin
+  return query select 10 into no_such_table;
+end;
+$$ language plpgsql;
+select * from rttest();
+ERROR:  SELECT INTO query does not return tuples
+CONTEXT:  SQL statement "select 10 into no_such_table"
+PL/pgSQL function rttest() line 3 at RETURN QUERY
+create or replace function rttest()
+returns setof int as $$
+begin
+  return query execute 'select 10 into no_such_table';
+end;
+$$ language plpgsql;
+select * from rttest();
+ERROR:  SELECT INTO query does not return tuples
+CONTEXT:  SQL statement "select 10 into no_such_table"
+PL/pgSQL function rttest() line 3 at RETURN QUERY
+select * from no_such_table;
+ERROR:  relation "no_such_table" does not exist
+LINE 1: select * from no_such_table;
+                      ^
 drop function rttest();
 -- Test for proper cleanup at subtransaction exit.  This example
 -- exposed a bug in PG 8.2.
diff --git a/src/test/regress/sql/plpgsql.sql b/src/test/regress/sql/plpgsql.sql
index 781666a83a..7e52d4745d 100644
--- a/src/test/regress/sql/plpgsql.sql
+++ b/src/test/regress/sql/plpgsql.sql
@@ -3494,7 +3494,7 @@ select * from tftest(10);
 
 drop function tftest(int);
 
-create or replace function rttest()
+create function rttest()
 returns setof int as $$
 declare rc int;
 begin
@@ -3515,6 +3515,28 @@ $$ language plpgsql;
 
 select * from rttest();
 
+-- check some error cases, too
+
+create or replace function rttest()
+returns setof int as $$
+begin
+  return query select 10 into no_such_table;
+end;
+$$ language plpgsql;
+
+select * from rttest();
+
+create or replace function rttest()
+returns setof int as $$
+begin
+  return query execute 'select 10 into no_such_table';
+end;
+$$ language plpgsql;
+
+select * from rttest();
+
+select * from no_such_table;
+
 drop function rttest();
 
 -- Test for proper cleanup at subtransaction exit.  This example