diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index c41c70c27a..7c658c0348 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -192,8 +192,7 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt) case OBJECT_TABLE: case OBJECT_VIEW: case OBJECT_FOREIGN_TABLE: - AlterTableNamespace(stmt->relation, stmt->newschema, - stmt->objectType, AccessExclusiveLock); + AlterTableNamespace(stmt); break; case OBJECT_TSPARSER: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 3b524152ba..0cd273e9c6 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -387,6 +387,8 @@ static const char *storage_name(char c); static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); +static void RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, + Oid oldrelid, void *arg); /* ---------------------------------------------------------------- @@ -2318,81 +2320,6 @@ renameatt(RenameStmt *stmt) stmt->behavior); } -/* - * Perform permissions and integrity checks before acquiring a relation lock. - */ -static void -RangeVarCallbackForRenameRelation(const RangeVar *rv, Oid relid, Oid oldrelid, - void *arg) -{ - RenameStmt *stmt = (RenameStmt *) arg; - ObjectType reltype; - HeapTuple tuple; - Form_pg_class classform; - AclResult aclresult; - char relkind; - - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); - if (!HeapTupleIsValid(tuple)) - return; /* concurrently dropped */ - classform = (Form_pg_class) GETSTRUCT(tuple); - relkind = classform->relkind; - - /* Must own table. */ - if (!pg_class_ownercheck(relid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - NameStr(classform->relname)); - - /* No system table modifications unless explicitly allowed. */ - if (!allowSystemTableMods && IsSystemClass(classform)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - NameStr(classform->relname)))); - - /* Must (still) have CREATE rights on containing namespace. */ - aclresult = pg_namespace_aclcheck(classform->relnamespace, GetUserId(), - ACL_CREATE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_NAMESPACE, - get_namespace_name(classform->relnamespace)); - - /* - * For compatibility with prior releases, we don't complain if ALTER TABLE - * or ALTER INDEX is used to rename some other type of relation. But - * ALTER SEQUENCE/VIEW/FOREIGN TABLE are only to be used with relations of - * that type. - */ - reltype = stmt->renameType; - if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a sequence", rv->relname))); - - if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a view", rv->relname))); - - if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a foreign table", rv->relname))); - - /* - * Don't allow ALTER TABLE on composite types. We want people to use ALTER - * TYPE for that. - */ - if (relkind == RELKIND_COMPOSITE_TYPE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a composite type", rv->relname), - errhint("Use ALTER TYPE instead."))); - - ReleaseSysCache(tuple); -} - - /* * Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME */ @@ -2410,7 +2337,7 @@ RenameRelation(RenameStmt *stmt) */ relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock, false, false, - RangeVarCallbackForRenameRelation, + RangeVarCallbackForAlterRelation, (void *) stmt); /* Do the work */ @@ -2545,6 +2472,19 @@ CheckTableNotInUse(Relation rel, const char *stmt) stmt, RelationGetRelationName(rel)))); } +/* + * AlterTableLookupRelation + * Look up, and lock, the OID for the relation named by an alter table + * statement. + */ +Oid +AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode) +{ + return RangeVarGetRelidExtended(stmt->relation, lockmode, false, false, + RangeVarCallbackForAlterRelation, + (void *) stmt); +} + /* * AlterTable * Execute ALTER TABLE, which can be a list of subcommands @@ -2579,90 +2519,26 @@ CheckTableNotInUse(Relation rel, const char *stmt) * Thanks to the magic of MVCC, an error anywhere along the way rolls back * the whole operation; we don't have to do anything special to clean up. * - * We lock the table as the first action, with an appropriate lock level + * The caller must lock the relation, with an appropriate lock level * for the subcommands requested. Any subcommand that needs to rewrite * tuples in the table forces the whole command to be executed with - * AccessExclusiveLock. If all subcommands do not require rewrite table - * then we may be able to use lower lock levels. We pass the lock level down + * AccessExclusiveLock (actually, that is currently required always, but + * we hope to relax it at some point). We pass the lock level down * so that we can apply it recursively to inherited tables. Note that the - * lock level we want as we recurse may well be higher than required for + * lock level we want as we recurse might well be higher than required for * that specific subcommand. So we pass down the overall lock requirement, * rather than reassess it at lower levels. */ void -AlterTable(AlterTableStmt *stmt) +AlterTable(Oid relid, LOCKMODE lockmode, AlterTableStmt *stmt) { Relation rel; - LOCKMODE lockmode = AlterTableGetLockLevel(stmt->cmds); - /* - * Acquire same level of lock as already acquired during parsing. - */ - rel = relation_openrv(stmt->relation, lockmode); + /* Caller is required to provide an adequate lock. */ + rel = relation_open(relid, NoLock); CheckTableNotInUse(rel, "ALTER TABLE"); - /* Check relation type against type specified in the ALTER command */ - switch (stmt->relkind) - { - case OBJECT_TABLE: - - /* - * For mostly-historical reasons, we allow ALTER TABLE to apply to - * almost all relation types. - */ - if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE - || rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - break; - - case OBJECT_INDEX: - if (rel->rd_rel->relkind != RELKIND_INDEX) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not an index", - RelationGetRelationName(rel)))); - break; - - case OBJECT_SEQUENCE: - if (rel->rd_rel->relkind != RELKIND_SEQUENCE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a sequence", - RelationGetRelationName(rel)))); - break; - - case OBJECT_TYPE: - if (rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a composite type", - RelationGetRelationName(rel)))); - break; - - case OBJECT_VIEW: - if (rel->rd_rel->relkind != RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a view", - RelationGetRelationName(rel)))); - break; - - case OBJECT_FOREIGN_TABLE: - if (rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a foreign table", - RelationGetRelationName(rel)))); - break; - - default: - elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind); - } - ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt), lockmode); } @@ -9530,104 +9406,11 @@ ATExecGenericOptions(Relation rel, List *options) heap_freetuple(tuple); } -/* - * Perform permissions and integrity checks before acquiring a relation lock. - */ -static void -RangeVarCallbackForAlterTableNamespace(const RangeVar *rv, Oid relid, - Oid oldrelid, void *arg) -{ - HeapTuple tuple; - Form_pg_class form; - ObjectType stmttype; - - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); - if (!HeapTupleIsValid(tuple)) - return; /* concurrently dropped */ - form = (Form_pg_class) GETSTRUCT(tuple); - - /* Must own table. */ - if (!pg_class_ownercheck(relid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname); - - /* No system table modifications unless explicitly allowed. */ - if (!allowSystemTableMods && IsSystemClass(form)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - rv->relname))); - - /* Check relation type against type specified in the ALTER command */ - stmttype = * (ObjectType *) arg; - switch (stmttype) - { - case OBJECT_TABLE: - - /* - * For mostly-historical reasons, we allow ALTER TABLE to apply to - * all relation types. - */ - break; - - case OBJECT_SEQUENCE: - if (form->relkind != RELKIND_SEQUENCE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a sequence", rv->relname))); - break; - - case OBJECT_VIEW: - if (form->relkind != RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a view", rv->relname))); - break; - - case OBJECT_FOREIGN_TABLE: - if (form->relkind != RELKIND_FOREIGN_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a foreign table", rv->relname))); - break; - - default: - elog(ERROR, "unrecognized object type: %d", (int) stmttype); - } - - /* Can we change the schema of this tuple? */ - switch (form->relkind) - { - case RELKIND_RELATION: - case RELKIND_VIEW: - case RELKIND_SEQUENCE: - case RELKIND_FOREIGN_TABLE: - /* ok to change schema */ - break; - case RELKIND_COMPOSITE_TYPE: - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a composite type", rv->relname), - errhint("Use ALTER TYPE instead."))); - break; - case RELKIND_INDEX: - case RELKIND_TOASTVALUE: - /* FALL THRU */ - default: - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table, view, sequence, or foreign table", - rv->relname))); - } - - ReleaseSysCache(tuple); -} - /* * Execute ALTER TABLE SET SCHEMA */ void -AlterTableNamespace(RangeVar *relation, const char *newschema, - ObjectType stmttype, LOCKMODE lockmode) +AlterTableNamespace(AlterObjectSchemaStmt *stmt) { Relation rel; Oid relid; @@ -9635,9 +9418,10 @@ AlterTableNamespace(RangeVar *relation, const char *newschema, Oid nspOid; Relation classRel; - relid = RangeVarGetRelidExtended(relation, lockmode, false, false, - RangeVarCallbackForAlterTableNamespace, - (void *) &stmttype); + relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock, + false, false, + RangeVarCallbackForAlterRelation, + (void *) stmt); rel = relation_open(relid, NoLock); oldNspOid = RelationGetNamespace(rel); @@ -9658,7 +9442,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema, } /* get schema OID and check its permissions */ - nspOid = LookupCreationNamespace(newschema); + nspOid = LookupCreationNamespace(stmt->newschema); /* common checks on switching namespaces */ CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid); @@ -9675,7 +9459,8 @@ AlterTableNamespace(RangeVar *relation, const char *newschema, if (rel->rd_rel->relkind == RELKIND_RELATION) { AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid); - AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema, lockmode); + AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, stmt->newschema, + AccessExclusiveLock); AlterConstraintNamespaces(relid, oldNspOid, nspOid, false); } @@ -10077,3 +9862,123 @@ RangeVarCallbackOwnsTable(const RangeVar *relation, if (!pg_class_ownercheck(relId, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); } + +/* + * Common RangeVarGetRelid callback for rename, set schema, and alter table + * processing. + */ +static void +RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid, + void *arg) +{ + Node *stmt = (Node *) arg; + ObjectType reltype; + HeapTuple tuple; + Form_pg_class classform; + AclResult aclresult; + char relkind; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); + if (!HeapTupleIsValid(tuple)) + return; /* concurrently dropped */ + classform = (Form_pg_class) GETSTRUCT(tuple); + relkind = classform->relkind; + + /* Must own relation. */ + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname); + + /* No system table modifications unless explicitly allowed. */ + if (!allowSystemTableMods && IsSystemClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + rv->relname))); + + /* + * Extract the specified relation type from the statement parse tree. + * + * Also, for ALTER .. RENAME, check permissions: the user must (still) + * have CREATE rights on the containing namespace. + */ + if (IsA(stmt, RenameStmt)) + { + aclresult = pg_namespace_aclcheck(classform->relnamespace, + GetUserId(), ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_NAMESPACE, + get_namespace_name(classform->relnamespace)); + reltype = ((RenameStmt *) stmt)->renameType; + } + else if (IsA(stmt, AlterObjectSchemaStmt)) + reltype = ((AlterObjectSchemaStmt *) stmt)->objectType; + else if (IsA(stmt, AlterTableStmt)) + reltype = ((AlterTableStmt *) stmt)->relkind; + else + { + reltype = OBJECT_TABLE; /* placate compiler */ + elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt)); + } + + /* + * For compatibility with prior releases, we allow ALTER TABLE to be + * used with most other types of relations (but not composite types). + * We allow similar flexibility for ALTER INDEX in the case of RENAME, + * but not otherwise. Otherwise, the user must select the correct form + * of the command for the relation at issue. + */ + if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a sequence", rv->relname))); + + if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a view", rv->relname))); + + if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a foreign table", rv->relname))); + + if (reltype == OBJECT_TYPE && relkind != RELKIND_COMPOSITE_TYPE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a composite type", rv->relname))); + + if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a foreign table", rv->relname))); + + if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX + && !IsA(stmt, RenameStmt)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not an index", rv->relname))); + + /* + * Don't allow ALTER TABLE on composite types. We want people to use ALTER + * TYPE for that. + */ + if (reltype != OBJECT_TYPE && relkind == RELKIND_COMPOSITE_TYPE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a composite type", rv->relname), + errhint("Use ALTER TYPE instead."))); + + /* + * Don't allow ALTER TABLE .. SET SCHEMA on relations that can't be + * moved to a different schema, such as indexes and TOAST tables. + */ + if (IsA(stmt, AlterObjectSchemaStmt) && relkind != RELKIND_RELATION + && relkind != RELKIND_VIEW && relkind != RELKIND_SEQUENCE + && relkind != RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table, view, sequence, or foreign table", + rv->relname))); + + ReleaseSysCache(tuple); +} diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 704bbe9ced..de16a61454 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -699,12 +699,23 @@ standard_ProcessUtility(Node *parsetree, case T_AlterTableStmt: { + AlterTableStmt *atstmt = (AlterTableStmt *) parsetree; + Oid relid; List *stmts; ListCell *l; + LOCKMODE lockmode; + + /* + * Figure out lock mode, and acquire lock. This also does + * basic permissions checks, so that we won't wait for a lock + * on (for example) a relation on which we have no + * permissions. + */ + lockmode = AlterTableGetLockLevel(atstmt->cmds); + relid = AlterTableLookupRelation(atstmt, lockmode); /* Run parse analysis ... */ - stmts = transformAlterTableStmt((AlterTableStmt *) parsetree, - queryString); + stmts = transformAlterTableStmt(atstmt, queryString); /* ... and do it */ foreach(l, stmts) @@ -714,7 +725,7 @@ standard_ProcessUtility(Node *parsetree, if (IsA(stmt, AlterTableStmt)) { /* Do the table alteration proper */ - AlterTable((AlterTableStmt *) stmt); + AlterTable(relid, lockmode, (AlterTableStmt *) stmt); } else { diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index e73315e750..03f397de63 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -24,7 +24,9 @@ extern Oid DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId); extern void RemoveRelations(DropStmt *drop); -extern void AlterTable(AlterTableStmt *stmt); +extern Oid AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode); + +extern void AlterTable(Oid relid, LOCKMODE lockmode, AlterTableStmt *stmt); extern LOCKMODE AlterTableGetLockLevel(List *cmds); @@ -32,8 +34,7 @@ extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, L extern void AlterTableInternal(Oid relid, List *cmds, bool recurse); -extern void AlterTableNamespace(RangeVar *relation, const char *newschema, - ObjectType stmttype, LOCKMODE lockmode); +extern void AlterTableNamespace(AlterObjectSchemaStmt *stmt); extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid, Oid oldNspOid, Oid newNspOid,