Fix unsupported options in CREATE TABLE ... AS EXECUTE.

The WITH [NO] DATA option was not supported, nor the ability to specify
replacement column names; the former limitation wasn't even documented, as
per recent complaint from Naoya Anzai.  Fix by moving the responsibility
for supporting these options into the executor.  It actually takes less
code this way ...

catversion bump due to change in representation of IntoClause, which might
affect stored rules.
This commit is contained in:
Tom Lane 2011-11-24 23:21:06 -05:00
parent e90710f34a
commit 9ed439a9c0
10 changed files with 62 additions and 127 deletions

View File

@ -105,10 +105,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE <replaceable
<listitem>
<para>
The name of a column in the new table. If column names are not
provided, they are taken from the output column names of the
query. If the table is created from an
<command>EXECUTE</command> command, a column name list cannot be
specified.
provided, they are taken from the output column names of the query.
</para>
</listitem>
</varlistentry>
@ -252,7 +249,7 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE <replaceable
identical to pre-8.0 releases. Applications that
require OIDs in the table created by <command>CREATE TABLE
AS</command> should explicitly specify <literal>WITH (OIDS)</literal>
to ensure proper behavior.
to ensure desired behavior.
</para>
</refsect1>

View File

@ -56,6 +56,7 @@
#include "storage/smgr.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
@ -305,6 +306,13 @@ standard_ExecutorRun(QueryDesc *queryDesc,
if (sendTuples)
(*dest->rStartup) (dest, operation, queryDesc->tupDesc);
/*
* if it's CREATE TABLE AS ... WITH NO DATA, skip plan execution
*/
if (estate->es_select_into &&
queryDesc->plannedstmt->intoClause->skipData)
direction = NoMovementScanDirection;
/*
* run plan
*/
@ -2388,6 +2396,7 @@ OpenIntoRel(QueryDesc *queryDesc)
{
IntoClause *into = queryDesc->plannedstmt->intoClause;
EState *estate = queryDesc->estate;
TupleDesc intoTupDesc = queryDesc->tupDesc;
Relation intoRelationDesc;
char *intoName;
Oid namespaceId;
@ -2415,6 +2424,31 @@ OpenIntoRel(QueryDesc *queryDesc)
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("ON COMMIT can only be used on temporary tables")));
/*
* If a column name list was specified in CREATE TABLE AS, override the
* column names derived from the query. (Too few column names are OK, too
* many are not.) It would probably be all right to scribble directly on
* the query's result tupdesc, but let's be safe and make a copy.
*/
if (into->colNames)
{
ListCell *lc;
intoTupDesc = CreateTupleDescCopy(intoTupDesc);
attnum = 1;
foreach(lc, into->colNames)
{
char *colname = strVal(lfirst(lc));
if (attnum > intoTupDesc->natts)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("CREATE TABLE AS specifies too many column names")));
namestrcpy(&(intoTupDesc->attrs[attnum - 1]->attname), colname);
attnum++;
}
}
/*
* Find namespace to create in, check its permissions
*/
@ -2477,7 +2511,7 @@ OpenIntoRel(QueryDesc *queryDesc)
InvalidOid,
InvalidOid,
GetUserId(),
queryDesc->tupDesc,
intoTupDesc,
NIL,
RELKIND_RELATION,
into->rel->relpersistence,
@ -2519,7 +2553,7 @@ OpenIntoRel(QueryDesc *queryDesc)
intoRelationDesc = heap_open(intoRelationId, AccessExclusiveLock);
/*
* check INSERT permission on the constructed table.
* Check INSERT permission on the constructed table.
*/
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
@ -2527,7 +2561,7 @@ OpenIntoRel(QueryDesc *queryDesc)
rte->relkind = RELKIND_RELATION;
rte->requiredPerms = ACL_INSERT;
for (attnum = 1; attnum <= queryDesc->tupDesc->natts; attnum++)
for (attnum = 1; attnum <= intoTupDesc->natts; attnum++)
rte->modifiedCols = bms_add_member(rte->modifiedCols,
attnum - FirstLowInvalidHeapAttributeNumber);

View File

@ -1041,6 +1041,7 @@ _copyIntoClause(IntoClause *from)
COPY_NODE_FIELD(options);
COPY_SCALAR_FIELD(onCommit);
COPY_STRING_FIELD(tableSpaceName);
COPY_SCALAR_FIELD(skipData);
return newnode;
}

View File

@ -119,6 +119,7 @@ _equalIntoClause(IntoClause *a, IntoClause *b)
COMPARE_NODE_FIELD(options);
COMPARE_SCALAR_FIELD(onCommit);
COMPARE_STRING_FIELD(tableSpaceName);
COMPARE_SCALAR_FIELD(skipData);
return true;
}

View File

@ -899,6 +899,7 @@ _outIntoClause(StringInfo str, IntoClause *node)
WRITE_NODE_FIELD(options);
WRITE_ENUM_FIELD(onCommit, OnCommitAction);
WRITE_STRING_FIELD(tableSpaceName);
WRITE_BOOL_FIELD(skipData);
}
static void

View File

@ -394,6 +394,7 @@ _readIntoClause(void)
READ_NODE_FIELD(options);
READ_ENUM_FIELD(onCommit, OnCommitAction);
READ_STRING_FIELD(tableSpaceName);
READ_BOOL_FIELD(skipData);
READ_DONE();
}

View File

@ -56,7 +56,6 @@ static Node *transformSetOperationTree(ParseState *pstate, SelectStmt *stmt,
bool isTopLevel, List **targetlist);
static void determineRecursiveColTypes(ParseState *pstate,
Node *larg, List *nrtargetlist);
static void applyColumnNames(List *dst, List *src);
static Query *transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt);
static List *transformReturningList(ParseState *pstate, List *returningList);
static Query *transformDeclareCursorStmt(ParseState *pstate,
@ -964,13 +963,8 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt)
pstate->p_windowdefs,
&qry->targetList);
/* handle any SELECT INTO/CREATE TABLE AS spec */
if (stmt->intoClause)
{
/* SELECT INTO/CREATE TABLE AS spec is just passed through */
qry->intoClause = stmt->intoClause;
if (stmt->intoClause->colNames)
applyColumnNames(qry->targetList, stmt->intoClause->colNames);
}
qry->rtable = pstate->p_rtable;
qry->jointree = makeFromExpr(pstate->p_joinlist, qual);
@ -1191,13 +1185,8 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("SELECT FOR UPDATE/SHARE cannot be applied to VALUES")));
/* handle any CREATE TABLE AS spec */
if (stmt->intoClause)
{
/* CREATE TABLE AS spec is just passed through */
qry->intoClause = stmt->intoClause;
if (stmt->intoClause->colNames)
applyColumnNames(qry->targetList, stmt->intoClause->colNames);
}
/*
* There mustn't have been any table references in the expressions, else
@ -1268,7 +1257,6 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt)
int leftmostRTI;
Query *leftmostQuery;
SetOperationStmt *sostmt;
List *intoColNames = NIL;
List *sortClause;
Node *limitOffset;
Node *limitCount;
@ -1306,11 +1294,7 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt)
leftmostSelect = leftmostSelect->larg;
Assert(leftmostSelect && IsA(leftmostSelect, SelectStmt) &&
leftmostSelect->larg == NULL);
if (leftmostSelect->intoClause)
{
qry->intoClause = leftmostSelect->intoClause;
intoColNames = leftmostSelect->intoClause->colNames;
}
/* clear this to prevent complaints in transformSetOperationTree() */
leftmostSelect->intoClause = NULL;
@ -1460,19 +1444,6 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt)
qry->limitCount = transformLimitClause(pstate, limitCount,
"LIMIT");
/*
* Handle SELECT INTO/CREATE TABLE AS.
*
* Any column names from CREATE TABLE AS need to be attached to both the
* top level and the leftmost subquery. We do not do this earlier because
* we do *not* want sortClause processing to be affected.
*/
if (intoColNames)
{
applyColumnNames(qry->targetList, intoColNames);
applyColumnNames(leftmostQuery->targetList, intoColNames);
}
qry->rtable = pstate->p_rtable;
qry->jointree = makeFromExpr(pstate->p_joinlist, NULL);
@ -1892,44 +1863,6 @@ determineRecursiveColTypes(ParseState *pstate, Node *larg, List *nrtargetlist)
analyzeCTETargetList(pstate, pstate->p_parent_cte, targetList);
}
/*
* Attach column names from a ColumnDef list to a TargetEntry list
* (for CREATE TABLE AS)
*/
static void
applyColumnNames(List *dst, List *src)
{
ListCell *dst_item;
ListCell *src_item;
src_item = list_head(src);
foreach(dst_item, dst)
{
TargetEntry *d = (TargetEntry *) lfirst(dst_item);
ColumnDef *s;
/* junk targets don't count */
if (d->resjunk)
continue;
/* fewer ColumnDefs than target entries is OK */
if (src_item == NULL)
break;
s = (ColumnDef *) lfirst(src_item);
src_item = lnext(src_item);
d->resname = pstrdup(s->colname);
}
/* more ColumnDefs than target entries is not OK */
if (src_item != NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("CREATE TABLE AS specifies too many column names")));
}
/*
* transformUpdateStmt -

View File

@ -387,8 +387,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
%type <node> case_expr case_arg when_clause case_default
%type <list> when_clause_list
%type <ival> sub_type
%type <list> OptCreateAs CreateAsList
%type <node> CreateAsElement ctext_expr
%type <node> ctext_expr
%type <value> NumericOnly
%type <list> NumericOnly_list
%type <alias> alias_clause
@ -3015,8 +3014,7 @@ CreateAsStmt:
* When the SelectStmt is a set-operation tree, we must
* stuff the INTO information into the leftmost component
* Select, because that's where analyze.c will expect
* to find it. Similarly, the output column names must
* be attached to that Select's target list.
* to find it.
*/
SelectStmt *n = findLeftmostSelect((SelectStmt *) $6);
if (n->intoClause != NULL)
@ -3024,17 +3022,16 @@ CreateAsStmt:
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("CREATE TABLE AS cannot specify INTO"),
parser_errposition(exprLocation((Node *) n->intoClause))));
$4->rel->relpersistence = $2;
n->intoClause = $4;
/* Implement WITH NO DATA by forcing top-level LIMIT 0 */
if (!$7)
((SelectStmt *) $6)->limitCount = makeIntConst(0, -1);
/* cram additional flags into the IntoClause */
$4->rel->relpersistence = $2;
$4->skipData = !($7);
$$ = $6;
}
;
create_as_target:
qualified_name OptCreateAs OptWith OnCommitOption OptTableSpace
qualified_name opt_column_list OptWith OnCommitOption OptTableSpace
{
$$ = makeNode(IntoClause);
$$->rel = $1;
@ -3042,36 +3039,7 @@ create_as_target:
$$->options = $3;
$$->onCommit = $4;
$$->tableSpaceName = $5;
}
;
OptCreateAs:
'(' CreateAsList ')' { $$ = $2; }
| /*EMPTY*/ { $$ = NIL; }
;
CreateAsList:
CreateAsElement { $$ = list_make1($1); }
| CreateAsList ',' CreateAsElement { $$ = lappend($1, $3); }
;
CreateAsElement:
ColId
{
ColumnDef *n = makeNode(ColumnDef);
n->colname = $1;
n->typeName = NULL;
n->inhcount = 0;
n->is_local = true;
n->is_not_null = false;
n->is_from_type = false;
n->storage = 0;
n->raw_default = NULL;
n->cooked_default = NULL;
n->collClause = NULL;
n->collOid = InvalidOid;
n->constraints = NIL;
$$ = (Node *)n;
$$->skipData = false; /* might get changed later */
}
;
@ -8030,18 +7998,15 @@ ExecuteStmt: EXECUTE name execute_param_clause
$$ = (Node *) n;
}
| CREATE OptTemp TABLE create_as_target AS
EXECUTE name execute_param_clause
EXECUTE name execute_param_clause opt_with_data
{
ExecuteStmt *n = makeNode(ExecuteStmt);
n->name = $7;
n->params = $8;
$4->rel->relpersistence = $2;
n->into = $4;
if ($4->colNames)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("column name list not allowed in CREATE TABLE / AS EXECUTE")));
/* ... because it's not implemented, but it could be */
/* cram additional flags into the IntoClause */
$4->rel->relpersistence = $2;
$4->skipData = !($9);
$$ = (Node *) n;
}
;
@ -8583,6 +8548,7 @@ into_clause:
$$->options = NIL;
$$->onCommit = ONCOMMIT_NOOP;
$$->tableSpaceName = NULL;
$$->skipData = false;
}
| /*EMPTY*/
{ $$ = NULL; }

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201111231
#define CATALOG_VERSION_NO 201111241
#endif

View File

@ -91,6 +91,7 @@ typedef struct IntoClause
List *options; /* options from WITH clause */
OnCommitAction onCommit; /* what do we do at COMMIT? */
char *tableSpaceName; /* table space to use, or NULL */
bool skipData; /* true for WITH NO DATA */
} IntoClause;