Proper object locking for GRANT/REVOKE

Refactor objectNamesToOids() to use get_object_address() internally if
possible.  Not only does this save a lot of code, it also allows us to
use the object locking provided by get_object_address() for
GRANT/REVOKE.  There was previously a code comment that complained
about the lack of locking in objectNamesToOids(), which is now fixed.

The check in ExecGrant_Type_check() is obsolete because
get_object_address_type() already does the same check.

Reviewed-by: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/bf72b82c-124d-4efa-a484-bb928e9494e4@eisentraut.org
This commit is contained in:
Peter Eisentraut 2024-11-15 10:53:54 +01:00
parent cfd7f36c83
commit d31bbfb659
2 changed files with 40 additions and 121 deletions

View File

@ -659,147 +659,77 @@ ExecGrantStmt_oids(InternalGrant *istmt)
* objectNamesToOids
*
* Turn a list of object names of a given type into an Oid list.
*
* XXX: This function doesn't take any sort of locks on the objects whose
* names it looks up. In the face of concurrent DDL, we might easily latch
* onto an old version of an object, causing the GRANT or REVOKE statement
* to fail.
*/
static List *
objectNamesToOids(ObjectType objtype, List *objnames, bool is_grant)
{
List *objects = NIL;
ListCell *cell;
const LOCKMODE lockmode = AccessShareLock;
Assert(objnames != NIL);
switch (objtype)
{
default:
/*
* For most object types, we use get_object_address() directly.
*/
foreach(cell, objnames)
{
ObjectAddress address;
address = get_object_address(objtype, lfirst(cell), NULL, lockmode, false);
objects = lappend_oid(objects, address.objectId);
}
break;
case OBJECT_TABLE:
case OBJECT_SEQUENCE:
/*
* Here, we don't use get_object_address(). It requires that the
* specified object type match the actual type of the object, but
* in GRANT/REVOKE, all table-like things are addressed as TABLE.
*/
foreach(cell, objnames)
{
RangeVar *relvar = (RangeVar *) lfirst(cell);
Oid relOid;
relOid = RangeVarGetRelid(relvar, NoLock, false);
relOid = RangeVarGetRelid(relvar, lockmode, false);
objects = lappend_oid(objects, relOid);
}
break;
case OBJECT_DATABASE:
foreach(cell, objnames)
{
char *dbname = strVal(lfirst(cell));
Oid dbid;
dbid = get_database_oid(dbname, false);
objects = lappend_oid(objects, dbid);
}
break;
case OBJECT_DOMAIN:
case OBJECT_TYPE:
/*
* The parse representation of types and domains in privilege
* targets is different from that expected by get_object_address()
* (for parse conflict reasons), so we have to do a bit of
* conversion here.
*/
foreach(cell, objnames)
{
List *typname = (List *) lfirst(cell);
Oid oid;
TypeName *tn = makeTypeNameFromNameList(typname);
ObjectAddress address;
Relation relation;
oid = typenameTypeId(NULL, makeTypeNameFromNameList(typname));
objects = lappend_oid(objects, oid);
address = get_object_address(objtype, (Node *) tn, &relation, lockmode, false);
Assert(relation == NULL);
objects = lappend_oid(objects, address.objectId);
}
break;
case OBJECT_FUNCTION:
foreach(cell, objnames)
{
ObjectWithArgs *func = (ObjectWithArgs *) lfirst(cell);
Oid funcid;
funcid = LookupFuncWithArgs(OBJECT_FUNCTION, func, false);
objects = lappend_oid(objects, funcid);
}
break;
case OBJECT_LANGUAGE:
foreach(cell, objnames)
{
char *langname = strVal(lfirst(cell));
Oid oid;
oid = get_language_oid(langname, false);
objects = lappend_oid(objects, oid);
}
break;
case OBJECT_LARGEOBJECT:
foreach(cell, objnames)
{
Oid lobjOid = oidparse(lfirst(cell));
if (!LargeObjectExists(lobjOid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist",
lobjOid)));
objects = lappend_oid(objects, lobjOid);
}
break;
case OBJECT_SCHEMA:
foreach(cell, objnames)
{
char *nspname = strVal(lfirst(cell));
Oid oid;
oid = get_namespace_oid(nspname, false);
objects = lappend_oid(objects, oid);
}
break;
case OBJECT_PROCEDURE:
foreach(cell, objnames)
{
ObjectWithArgs *func = (ObjectWithArgs *) lfirst(cell);
Oid procid;
procid = LookupFuncWithArgs(OBJECT_PROCEDURE, func, false);
objects = lappend_oid(objects, procid);
}
break;
case OBJECT_ROUTINE:
foreach(cell, objnames)
{
ObjectWithArgs *func = (ObjectWithArgs *) lfirst(cell);
Oid routid;
routid = LookupFuncWithArgs(OBJECT_ROUTINE, func, false);
objects = lappend_oid(objects, routid);
}
break;
case OBJECT_TABLESPACE:
foreach(cell, objnames)
{
char *spcname = strVal(lfirst(cell));
Oid spcoid;
spcoid = get_tablespace_oid(spcname, false);
objects = lappend_oid(objects, spcoid);
}
break;
case OBJECT_FDW:
foreach(cell, objnames)
{
char *fdwname = strVal(lfirst(cell));
Oid fdwid = get_foreign_data_wrapper_oid(fdwname, false);
objects = lappend_oid(objects, fdwid);
}
break;
case OBJECT_FOREIGN_SERVER:
foreach(cell, objnames)
{
char *srvname = strVal(lfirst(cell));
Oid srvid = get_foreign_server_oid(srvname, false);
objects = lappend_oid(objects, srvid);
}
break;
case OBJECT_PARAMETER_ACL:
/*
* Parameters are handled completely differently.
*/
foreach(cell, objnames)
{
/*
@ -830,9 +760,6 @@ objectNamesToOids(ObjectType objtype, List *objnames, bool is_grant)
objects = lappend_oid(objects, parameterId);
}
break;
default:
elog(ERROR, "unrecognized GrantStmt.objtype: %d",
(int) objtype);
}
return objects;
@ -2458,14 +2385,6 @@ ExecGrant_Type_check(InternalGrant *istmt, HeapTuple tuple)
(errcode(ERRCODE_INVALID_GRANT_OPERATION),
errmsg("cannot set privileges of multirange types"),
errhint("Set the privileges of the range type instead.")));
/* Used GRANT DOMAIN on a non-domain? */
if (istmt->objtype == OBJECT_DOMAIN &&
pg_type_tuple->typtype != TYPTYPE_DOMAIN)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a domain",
NameStr(pg_type_tuple->typname))));
}
static void

View File

@ -248,6 +248,6 @@ relhasindex
-----------
(0 rows)
s4: WARNING: got: cache lookup failed for relation REDACTED
s4: WARNING: got: relation "intra_grant_inplace" does not exist
step revoke4: <... completed>
step r3: ROLLBACK;