diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index c9b1d5fd04..9f1bcf1de4 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -975,26 +975,11 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode) { Oid relOid; - /* - * Check for shared-cache-inval messages before trying to open the - * relation. This is needed to cover the case where the name identifies a - * rel that has been dropped and recreated since the start of our - * transaction: if we don't flush the old syscache entry then we'll latch - * onto that entry and suffer an error when we do RelationIdGetRelation. - * Note that relation_open does not need to do this, since a relation's - * OID never changes. - * - * We skip this if asked for NoLock, on the assumption that the caller has - * already ensured some appropriate lock is held. - */ - if (lockmode != NoLock) - AcceptInvalidationMessages(); - - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(relation, false); + /* Look up and lock the appropriate relation using namespace search */ + relOid = RangeVarGetRelid(relation, lockmode, false, false); /* Let relation_open do the rest */ - return relation_open(relOid, lockmode); + return relation_open(relOid, NoLock); } /* ---------------- @@ -1012,30 +997,15 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode, { Oid relOid; - /* - * Check for shared-cache-inval messages before trying to open the - * relation. This is needed to cover the case where the name identifies a - * rel that has been dropped and recreated since the start of our - * transaction: if we don't flush the old syscache entry then we'll latch - * onto that entry and suffer an error when we do RelationIdGetRelation. - * Note that relation_open does not need to do this, since a relation's - * OID never changes. - * - * We skip this if asked for NoLock, on the assumption that the caller has - * already ensured some appropriate lock is held. - */ - if (lockmode != NoLock) - AcceptInvalidationMessages(); - - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(relation, missing_ok); + /* Look up and lock the appropriate relation using namespace search */ + relOid = RangeVarGetRelid(relation, lockmode, missing_ok, false); /* Return NULL on not-found */ if (!OidIsValid(relOid)) return NULL; /* Let relation_open do the rest */ - return relation_open(relOid, lockmode); + return relation_open(relOid, NoLock); } /* ---------------- diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index df32731b87..9a125bdac6 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -583,6 +583,11 @@ ExecGrantStmt_oids(InternalGrant *istmt) * objectNamesToOids * * Turn a list of object names of a given type into an Oid list. + * + * XXX: This function doesn't take any sort of locks on the objects whose + * names it looks up. In the face of concurrent DDL, we might easily latch + * onto an old version of an object, causing the GRANT or REVOKE statement + * to fail. */ static List * objectNamesToOids(GrantObjectType objtype, List *objnames) @@ -601,7 +606,7 @@ objectNamesToOids(GrantObjectType objtype, List *objnames) RangeVar *relvar = (RangeVar *) lfirst(cell); Oid relOid; - relOid = RangeVarGetRelid(relvar, false); + relOid = RangeVarGetRelid(relvar, NoLock, false, false); objects = lappend_oid(objects, relOid); } break; diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 39ba4869af..83aae7abad 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -2942,7 +2942,8 @@ reindex_relation(Oid relid, int flags) /* * Open and lock the relation. ShareLock is sufficient since we only need - * to prevent schema and data changes in it. + * to prevent schema and data changes in it. The lock level used here + * should match ReindexTable(). */ rel = heap_open(relid, ShareLock); diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index ce795a61c5..8d426951ba 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -44,6 +44,8 @@ #include "parser/parse_func.h" #include "storage/backendid.h" #include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/sinval.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -215,14 +217,20 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS); * Given a RangeVar describing an existing relation, * select the proper namespace and look up the relation OID. * - * If the relation is not found, return InvalidOid if failOK = true, + * If the relation is not found, return InvalidOid if missing_ok = true, * otherwise raise an error. + * + * If nowait = true, throw an error if we'd have to wait for a lock. */ Oid -RangeVarGetRelid(const RangeVar *relation, bool failOK) +RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, + bool nowait) { + uint64 inval_count; Oid namespaceId; Oid relId; + Oid oldRelId = InvalidOid; + bool retry = false; /* * We check the catalog name and then ignore it. @@ -238,37 +246,120 @@ RangeVarGetRelid(const RangeVar *relation, bool failOK) } /* - * Some non-default relpersistence value may have been specified. The - * parser never generates such a RangeVar in simple DML, but it can happen - * in contexts such as "CREATE TEMP TABLE foo (f1 int PRIMARY KEY)". Such - * a command will generate an added CREATE INDEX operation, which must be - * careful to find the temp table, even when pg_temp is not first in the - * search path. + * DDL operations can change the results of a name lookup. Since all + * such operations will generate invalidation messages, we keep track + * of whether any such messages show up while we're performing the + * operation, and retry until either (1) no more invalidation messages + * show up or (2) the answer doesn't change. + * + * But if lockmode = NoLock, then we assume that either the caller is OK + * with the answer changing under them, or that they already hold some + * appropriate lock, and therefore return the first answer we get without + * checking for invalidation messages. Also, if the requested lock is + * already held, no LockRelationOid will not AcceptInvalidationMessages, + * so we may fail to notice a change. We could protect against that case + * by calling AcceptInvalidationMessages() before beginning this loop, + * but that would add a significant amount overhead, so for now we don't. */ - if (relation->relpersistence == RELPERSISTENCE_TEMP) + for (;;) { - if (relation->schemaname) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("temporary tables cannot specify a schema name"))); - if (OidIsValid(myTempNamespace)) - relId = get_relname_relid(relation->relname, myTempNamespace); - else /* this probably can't happen? */ - relId = InvalidOid; - } - else if (relation->schemaname) - { - /* use exact schema given */ - namespaceId = LookupExplicitNamespace(relation->schemaname); - relId = get_relname_relid(relation->relname, namespaceId); - } - else - { - /* search the namespace path */ - relId = RelnameGetRelid(relation->relname); + /* + * Remember this value, so that, after looking up the relation name + * and locking its OID, we can check whether any invalidation messages + * have been processed that might require a do-over. + */ + inval_count = SharedInvalidMessageCounter; + + /* + * Some non-default relpersistence value may have been specified. The + * parser never generates such a RangeVar in simple DML, but it can + * happen in contexts such as "CREATE TEMP TABLE foo (f1 int PRIMARY + * KEY)". Such a command will generate an added CREATE INDEX + * operation, which must be careful to find the temp table, even when + * pg_temp is not first in the search path. + */ + if (relation->relpersistence == RELPERSISTENCE_TEMP) + { + if (relation->schemaname) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("temporary tables cannot specify a schema name"))); + if (OidIsValid(myTempNamespace)) + relId = get_relname_relid(relation->relname, myTempNamespace); + else /* this probably can't happen? */ + relId = InvalidOid; + } + else if (relation->schemaname) + { + /* use exact schema given */ + namespaceId = LookupExplicitNamespace(relation->schemaname); + relId = get_relname_relid(relation->relname, namespaceId); + } + else + { + /* search the namespace path */ + relId = RelnameGetRelid(relation->relname); + } + + /* + * If no lock requested, we assume the caller knows what they're + * doing. They should have already acquired a heavyweight lock on + * this relation earlier in the processing of this same statement, + * so it wouldn't be appropriate to AcceptInvalidationMessages() + * here, as that might pull the rug out from under them. + */ + if (lockmode == NoLock) + break; + + /* + * If, upon retry, we get back the same OID we did last time, then + * the invalidation messages we processed did not change the final + * answer. So we're done. + */ + if (retry && relId == oldRelId) + break; + + /* + * Lock relation. This will also accept any pending invalidation + * messages. If we got back InvalidOid, indicating not found, then + * there's nothing to lock, but we accept invalidation messages + * anyway, to flush any negative catcache entries that may be + * lingering. + */ + if (!OidIsValid(relId)) + AcceptInvalidationMessages(); + else if (!nowait) + LockRelationOid(relId, lockmode); + else if (!ConditionalLockRelationOid(relId, lockmode)) + { + if (relation->schemaname) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s.%s\"", + relation->schemaname, relation->relname))); + else + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relation->relname))); + } + + /* + * If no invalidation message were processed, we're done! + */ + if (inval_count == SharedInvalidMessageCounter) + break; + + /* + * Something may have changed. Let's repeat the name lookup, to + * make sure this name still references the same relation it did + * previously. + */ + retry = true; + oldRelId = relId; } - if (!OidIsValid(relId) && !failOK) + if (!OidIsValid(relId) && !missing_ok) { if (relation->schemaname) ereport(ERROR, diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 08ee93b8ea..e1be451e35 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -108,7 +108,12 @@ ExecRenameStmt(RenameStmt *stmt) CheckRelationOwnership(stmt->relation, true); - relid = RangeVarGetRelid(stmt->relation, false); + /* + * Lock level used here should match what will be taken later, + * in RenameRelation, renameatt, or renametrig. + */ + relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, + false, false); switch (stmt->renameType) { diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index b7c021d943..5024854081 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -1531,9 +1531,15 @@ ReindexIndex(RangeVar *indexRelation) Oid indOid; HeapTuple tuple; - indOid = RangeVarGetRelid(indexRelation, false); + /* + * XXX: This is not safe in the presence of concurrent DDL. We should + * take AccessExclusiveLock here, but that would violate the rule that + * indexes should only be locked after their parent tables. For now, + * we live with it. + */ + indOid = RangeVarGetRelid(indexRelation, NoLock, false, false); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid)); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ + if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for relation %u", indOid); if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX) @@ -1562,7 +1568,8 @@ ReindexTable(RangeVar *relation) Oid heapOid; HeapTuple tuple; - heapOid = RangeVarGetRelid(relation, false); + /* The lock level used here should match reindex_relation(). */ + heapOid = RangeVarGetRelid(relation, ShareLock, false, false); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid)); if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ elog(ERROR, "cache lookup failed for relation %u", heapOid); diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index b2c6ea5559..875f868b96 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -24,8 +24,8 @@ #include "utils/acl.h" #include "utils/lsyscache.h" -static void LockTableRecurse(Oid reloid, RangeVar *rv, - LOCKMODE lockmode, bool nowait, bool recurse); +static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, + bool recurse); /* @@ -36,100 +36,40 @@ LockTableCommand(LockStmt *lockstmt) { ListCell *p; + /* + * During recovery we only accept these variations: LOCK TABLE foo IN + * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo + * IN ROW EXCLUSIVE MODE This test must match the restrictions defined + * in LockAcquire() + */ + if (lockstmt->mode > RowExclusiveLock) + PreventCommandDuringRecovery("LOCK TABLE"); + /* * Iterate over the list and process the named relations one at a time */ foreach(p, lockstmt->relations) { - RangeVar *relation = (RangeVar *) lfirst(p); - bool recurse = interpretInhOption(relation->inhOpt); + RangeVar *rv = (RangeVar *) lfirst(p); + Relation rel; + bool recurse = interpretInhOption(rv->inhOpt); Oid reloid; - reloid = RangeVarGetRelid(relation, false); + reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait); + rel = relation_open(reloid, NoLock); - /* - * During recovery we only accept these variations: LOCK TABLE foo IN - * ACCESS SHARE MODE LOCK TABLE foo IN ROW SHARE MODE LOCK TABLE foo - * IN ROW EXCLUSIVE MODE This test must match the restrictions defined - * in LockAcquire() - */ - if (lockstmt->mode > RowExclusiveLock) - PreventCommandDuringRecovery("LOCK TABLE"); - - LockTableRecurse(reloid, relation, - lockstmt->mode, lockstmt->nowait, recurse); + LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse); } } /* * Apply LOCK TABLE recursively over an inheritance tree - * - * At top level, "rv" is the original command argument; we use it to throw - * an appropriate error message if the relation isn't there. Below top level, - * "rv" is NULL and we should just silently ignore any dropped child rel. */ static void -LockTableRecurse(Oid reloid, RangeVar *rv, - LOCKMODE lockmode, bool nowait, bool recurse) +LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse) { - Relation rel; AclResult aclresult; - - /* - * Acquire the lock. We must do this first to protect against concurrent - * drops. Note that a lock against an already-dropped relation's OID - * won't fail. - */ - if (nowait) - { - if (!ConditionalLockRelationOid(reloid, lockmode)) - { - /* try to throw error by name; relation could be deleted... */ - char *relname = rv ? rv->relname : get_rel_name(reloid); - - if (relname) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation \"%s\"", - relname))); - else - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation with OID %u", - reloid))); - } - } - else - LockRelationOid(reloid, lockmode); - - /* - * Now that we have the lock, check to see if the relation really exists - * or not. - */ - rel = try_relation_open(reloid, NoLock); - - if (!rel) - { - /* Release useless lock */ - UnlockRelationOid(reloid, lockmode); - - /* At top level, throw error; otherwise, ignore this child rel */ - if (rv) - { - if (rv->schemaname) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s.%s\" does not exist", - rv->schemaname, rv->relname))); - else - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s\" does not exist", - rv->relname))); - } - - return; - } + Oid reloid = RelationGetRelid(rel); /* Verify adequate privilege */ if (lockmode == AccessShareLock) @@ -159,12 +99,48 @@ LockTableRecurse(Oid reloid, RangeVar *rv, { List *children = find_inheritance_children(reloid, NoLock); ListCell *lc; + Relation childrel; foreach(lc, children) { Oid childreloid = lfirst_oid(lc); - LockTableRecurse(childreloid, NULL, lockmode, nowait, recurse); + /* + * Acquire the lock, to protect against concurrent drops. Note + * that a lock against an already-dropped relation's OID won't + * fail. + */ + if (!nowait) + LockRelationOid(childreloid, lockmode); + else if (!ConditionalLockRelationOid(childreloid, lockmode)) + { + /* try to throw error by name; relation could be deleted... */ + char *relname = get_rel_name(childreloid); + + if (relname) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relname))); + else + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation with OID %u", + reloid))); + } + + /* + * Now that we have the lock, check to see if the relation really + * exists or not. + */ + childrel = try_relation_open(childreloid, NoLock); + if (!childrel) + { + /* Release useless lock */ + UnlockRelationOid(childreloid, lockmode); + } + + LockTableRecurse(childrel, lockmode, nowait, recurse); } } diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index be04177a2e..de6e2a3e33 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -427,8 +427,8 @@ AlterSequence(AlterSeqStmt *stmt) FormData_pg_sequence new; List *owned_by; - /* open and AccessShareLock sequence */ - relid = RangeVarGetRelid(stmt->sequence, false); + /* Open and lock sequence. */ + relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false); init_sequence(relid, &elm, &seqrel); /* allow ALTER to sequence owner only */ @@ -507,7 +507,16 @@ nextval(PG_FUNCTION_ARGS) Oid relid; sequence = makeRangeVarFromNameList(textToQualifiedNameList(seqin)); - relid = RangeVarGetRelid(sequence, false); + + /* + * XXX: This is not safe in the presence of concurrent DDL, but + * acquiring a lock here is more expensive than letting nextval_internal + * do it, since the latter maintains a cache that keeps us from hitting + * the lock manager more than once per transaction. It's not clear + * whether the performance penalty is material in practice, but for now, + * we do it this way. + */ + relid = RangeVarGetRelid(sequence, NoLock, false, false); PG_RETURN_INT64(nextval_internal(relid)); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c1af12a5a3..295a1ff6e6 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -764,8 +764,15 @@ RemoveRelations(DropStmt *drop) */ AcceptInvalidationMessages(); - /* Look up the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(rel, true); + /* + * Look up the appropriate relation using namespace search. + * + * XXX: Doing this without a lock is unsafe in the presence of + * concurrent DDL, but acquiring a lock here might violate the rule + * that a table must be locked before its corresponding index. + * So, for now, we ignore the hazard. + */ + relOid = RangeVarGetRelid(rel, NoLock, true, false); /* Not there? */ if (!OidIsValid(relOid)) @@ -2234,6 +2241,8 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype) /* * Grab an exclusive lock on the target table, index, sequence or view, * which we will NOT release until end of transaction. + * + * Lock level used here should match ExecRenameStmt */ targetrelation = relation_open(myrelid, AccessExclusiveLock); diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 9cf8372e96..cadf3a1545 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -194,7 +194,17 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, RelationGetRelationName(rel)))); if (stmt->isconstraint && stmt->constrrel != NULL) - constrrelid = RangeVarGetRelid(stmt->constrrel, false); + { + /* + * We must take a lock on the target relation to protect against + * concurrent drop. It's not clear that AccessShareLock is strong + * enough, but we certainly need at least that much... otherwise, + * we might end up creating a pg_constraint entry referencing a + * nonexistent table. + */ + constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false, + false); + } /* permission checks */ if (!isInternal) @@ -1020,11 +1030,15 @@ ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) * DropTrigger - drop an individual trigger by name */ void -DropTrigger(Oid relid, const char *trigname, DropBehavior behavior, +DropTrigger(RangeVar *relation, const char *trigname, DropBehavior behavior, bool missing_ok) { + Oid relid; ObjectAddress object; + /* lock level should match RemoveTriggerById */ + relid = RangeVarGetRelid(relation, ShareRowExclusiveLock, false, false); + object.classId = TriggerRelationId; object.objectId = get_trigger_oid(relid, trigname, missing_ok); object.objectSubId = 0; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 9303967863..889737e26e 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -318,7 +318,16 @@ get_rel_oids(Oid relid, const RangeVar *vacrel) /* Process a specific relation */ Oid relid; - relid = RangeVarGetRelid(vacrel, false); + /* + * Since we don't take a lock here, the relation might be gone, + * or the RangeVar might no longer refer to the OID we look up + * here. In the former case, VACUUM will do nothing; in the + * latter case, it will process the OID we looked up here, rather + * than the new one. Neither is ideal, but there's little practical + * alternative, since we're going to commit this transaction and + * begin a new one between now and then. + */ + relid = RangeVarGetRelid(vacrel, NoLock, false, false); /* Make a relation list entry for this guy */ oldcontext = MemoryContextSwitchTo(vac_context); diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c index f2ccf0d745..3840c2f3ed 100644 --- a/src/backend/parser/parse_relation.c +++ b/src/backend/parser/parse_relation.c @@ -273,11 +273,17 @@ searchRangeTable(ParseState *pstate, RangeVar *relation) * If it's an unqualified name, check for possible CTE matches. A CTE * hides any real relation matches. If no CTE, look for a matching * relation. + * + * NB: It's not critical that RangeVarGetRelid return the correct answer + * here in the face of concurrent DDL. If it doesn't, the worst case + * scenario is a less-clear error message. Also, the tables involved in + * the query are already locked, which reduces the number of cases in + * which surprising behavior can occur. So we do the name lookup unlocked. */ if (!relation->schemaname) cte = scanNameSpaceForCTE(pstate, refname, &ctelevelsup); if (!cte) - relId = RangeVarGetRelid(relation, true); + relId = RangeVarGetRelid(relation, NoLock, true, false); /* Now look for RTEs matching either the relation/CTE or the alias */ for (levelsup = 0; diff --git a/src/backend/parser/parse_type.c b/src/backend/parser/parse_type.c index ac62cbc8c3..2122032ce2 100644 --- a/src/backend/parser/parse_type.c +++ b/src/backend/parser/parse_type.c @@ -108,8 +108,14 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName, break; } - /* look up the field */ - relid = RangeVarGetRelid(rel, false); + /* + * Look up the field. + * + * XXX: As no lock is taken here, this might fail in the presence + * of concurrent DDL. But taking a lock would carry a performance + * penalty and would also require a permissions check. + */ + relid = RangeVarGetRelid(rel, NoLock, false, false); attnum = get_attnum(relid, field); if (attnum == InvalidAttrNumber) ereport(ERROR, diff --git a/src/backend/rewrite/rewriteDefine.c b/src/backend/rewrite/rewriteDefine.c index 60b988175b..17db70e74a 100644 --- a/src/backend/rewrite/rewriteDefine.c +++ b/src/backend/rewrite/rewriteDefine.c @@ -196,11 +196,15 @@ DefineRule(RuleStmt *stmt, const char *queryString) Node *whereClause; Oid relId; - /* Parse analysis ... */ + /* Parse analysis. */ transformRuleStmt(stmt, queryString, &actions, &whereClause); - /* ... find the relation ... */ - relId = RangeVarGetRelid(stmt->relation, false); + /* + * Find and lock the relation. Lock level should match + * DefineQueryRewrite. + */ + relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false, + false); /* ... and execute */ DefineQueryRewrite(stmt->rulename, @@ -242,6 +246,8 @@ DefineQueryRewrite(char *rulename, * grab ShareRowExclusiveLock to lock out insert/update/delete actions and * to ensure that we lock out current CREATE RULE statements; but because * of race conditions in access to catalog entries, we can't do that yet. + * + * Note that this lock level should match the one used in DefineRule. */ event_relation = heap_open(event_relid, AccessExclusiveLock); diff --git a/src/backend/rewrite/rewriteRemove.c b/src/backend/rewrite/rewriteRemove.c index b9dc7c5d9f..cec22ac6a8 100644 --- a/src/backend/rewrite/rewriteRemove.c +++ b/src/backend/rewrite/rewriteRemove.c @@ -19,6 +19,7 @@ #include "access/sysattr.h" #include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" #include "catalog/pg_rewrite.h" #include "miscadmin.h" #include "rewrite/rewriteRemove.h" @@ -37,13 +38,18 @@ * Delete a rule given its name. */ void -RemoveRewriteRule(Oid owningRel, const char *ruleName, DropBehavior behavior, - bool missing_ok) +RemoveRewriteRule(RangeVar *relation, const char *ruleName, + DropBehavior behavior, bool missing_ok) { HeapTuple tuple; Oid eventRelationOid; + Oid owningRel; ObjectAddress object; + /* should match RemoveRewriteRuleById */ + owningRel = RangeVarGetRelid(relation, ShareUpdateExclusiveLock, + false, false); + /* * Find the tuple for the target rule. */ diff --git a/src/backend/storage/ipc/sinval.c b/src/backend/storage/ipc/sinval.c index 9ab16b16ed..8499615f14 100644 --- a/src/backend/storage/ipc/sinval.c +++ b/src/backend/storage/ipc/sinval.c @@ -22,6 +22,9 @@ #include "utils/inval.h" +uint64 SharedInvalidMessageCounter; + + /* * Because backends sitting idle will not be reading sinval events, we * need a way to give an idle backend a swift kick in the rear and make @@ -90,6 +93,7 @@ ReceiveSharedInvalidMessages( { SharedInvalidationMessage *msg = &messages[nextmsg++]; + SharedInvalidMessageCounter++; invalFunction(msg); } @@ -106,6 +110,7 @@ ReceiveSharedInvalidMessages( { /* got a reset message */ elog(DEBUG4, "cache state reset"); + SharedInvalidMessageCounter++; resetFunction(); break; /* nothing more to do */ } @@ -118,6 +123,7 @@ ReceiveSharedInvalidMessages( { SharedInvalidationMessage *msg = &messages[nextmsg++]; + SharedInvalidMessageCounter++; invalFunction(msg); } diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 859b3852db..3ac098b2a9 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -81,10 +81,11 @@ LockRelationOid(Oid relid, LOCKMODE lockmode) /* * Now that we have the lock, check for invalidation messages, so that we * will update or flush any stale relcache entry before we try to use it. - * We can skip this in the not-uncommon case that we already had the same - * type of lock being requested, since then no one else could have - * modified the relcache entry in an undesirable way. (In the case where - * our own xact modifies the rel, the relcache update happens via + * RangeVarGetRelid() specifically relies on us for this. We can skip + * this in the not-uncommon case that we already had the same type of lock + * being requested, since then no one else could have modified the + * relcache entry in an undesirable way. (In the case where our own xact + * modifies the rel, the relcache update happens via * CommandCounterIncrement, not here.) */ if (res != LOCKACQUIRE_ALREADY_HELD) diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 224e1f3761..0698cafbce 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -77,7 +77,15 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs) Oid relOid; HeapTuple tuple; - relOid = RangeVarGetRelid(rel, false); + /* + * XXX: This is unsafe in the presence of concurrent DDL, since it is + * called before acquiring any lock on the target relation. However, + * locking the target relation (especially using something like + * AccessExclusiveLock) before verifying that the user has permissions + * is not appealing either. + */ + relOid = RangeVarGetRelid(rel, NoLock, false, false); + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); if (!HeapTupleIsValid(tuple)) /* should not happen */ elog(ERROR, "cache lookup failed for relation %u", relOid); @@ -1111,20 +1119,17 @@ standard_ProcessUtility(Node *parsetree, case T_DropPropertyStmt: { DropPropertyStmt *stmt = (DropPropertyStmt *) parsetree; - Oid relId; - - relId = RangeVarGetRelid(stmt->relation, false); switch (stmt->removeType) { case OBJECT_RULE: /* RemoveRewriteRule checks permissions */ - RemoveRewriteRule(relId, stmt->property, + RemoveRewriteRule(stmt->relation, stmt->property, stmt->behavior, stmt->missing_ok); break; case OBJECT_TRIGGER: /* DropTrigger checks permissions */ - DropTrigger(relId, stmt->property, + DropTrigger(stmt->relation, stmt->property, stmt->behavior, stmt->missing_ok); break; default: diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c index 4a3e241c41..3fa95e2fd3 100644 --- a/src/backend/utils/adt/acl.c +++ b/src/backend/utils/adt/acl.c @@ -1939,7 +1939,8 @@ convert_table_name(text *tablename) relrv = makeRangeVarFromNameList(textToQualifiedNameList(tablename)); - return RangeVarGetRelid(relrv, false); + /* We might not even have permissions on this relation; don't lock it. */ + return RangeVarGetRelid(relrv, NoLock, false, false); } /* diff --git a/src/backend/utils/adt/regproc.c b/src/backend/utils/adt/regproc.c index 6716c0204c..0d42a39ec4 100644 --- a/src/backend/utils/adt/regproc.c +++ b/src/backend/utils/adt/regproc.c @@ -823,7 +823,9 @@ regclassin(PG_FUNCTION_ARGS) */ names = stringToQualifiedNameList(class_name_or_oid); - result = RangeVarGetRelid(makeRangeVarFromNameList(names), false); + /* We might not even have permissions on this relation; don't lock it. */ + result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false, + false); PG_RETURN_OID(result); } @@ -1294,7 +1296,9 @@ text_regclass(PG_FUNCTION_ARGS) RangeVar *rv; rv = makeRangeVarFromNameList(textToQualifiedNameList(relname)); - result = RangeVarGetRelid(rv, false); + + /* We might not even have permissions on this relation; don't lock it. */ + result = RangeVarGetRelid(rv, NoLock, false, false); PG_RETURN_OID(result); } diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 49dc9c88d5..3fd99e00e3 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -385,8 +385,9 @@ pg_get_viewdef_name(PG_FUNCTION_ARGS) RangeVar *viewrel; Oid viewoid; + /* Look up view name. Can't lock it - we might not have privileges. */ viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname)); - viewoid = RangeVarGetRelid(viewrel, false); + viewoid = RangeVarGetRelid(viewrel, NoLock, false, false); PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, 0))); } @@ -403,8 +404,10 @@ pg_get_viewdef_name_ext(PG_FUNCTION_ARGS) Oid viewoid; prettyFlags = pretty ? PRETTYFLAG_PAREN | PRETTYFLAG_INDENT : 0; + + /* Look up view name. Can't lock it - we might not have privileges. */ viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname)); - viewoid = RangeVarGetRelid(viewrel, false); + viewoid = RangeVarGetRelid(viewrel, NoLock, false, false); PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, prettyFlags))); } @@ -1567,9 +1570,9 @@ pg_get_serial_sequence(PG_FUNCTION_ARGS) SysScanDesc scan; HeapTuple tup; - /* Get the OID of the table */ + /* Look up table name. Can't lock it - we might not have privileges. */ tablerv = makeRangeVarFromNameList(textToQualifiedNameList(tablename)); - tableOid = RangeVarGetRelid(tablerv, false); + tableOid = RangeVarGetRelid(tablerv, NoLock, false, false); /* Get the number of the column */ column = text_to_cstring(columnname); diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 7e1e194794..4bcbc20497 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -15,6 +15,7 @@ #define NAMESPACE_H #include "nodes/primnodes.h" +#include "storage/lock.h" /* @@ -47,7 +48,8 @@ typedef struct OverrideSearchPath } OverrideSearchPath; -extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK); +extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, + bool missing_ok, bool nowait); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid); diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index ad97871d98..fe21298b64 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -112,7 +112,7 @@ extern Oid CreateTrigger(CreateTrigStmt *stmt, const char *queryString, Oid constraintOid, Oid indexOid, bool isInternal); -extern void DropTrigger(Oid relid, const char *trigname, +extern void DropTrigger(RangeVar *relation, const char *trigname, DropBehavior behavior, bool missing_ok); extern void RemoveTriggerById(Oid trigOid); extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok); diff --git a/src/include/rewrite/rewriteRemove.h b/src/include/rewrite/rewriteRemove.h index 90df04591f..b9a63bad7b 100644 --- a/src/include/rewrite/rewriteRemove.h +++ b/src/include/rewrite/rewriteRemove.h @@ -17,7 +17,7 @@ #include "nodes/parsenodes.h" -extern void RemoveRewriteRule(Oid owningRel, const char *ruleName, +extern void RemoveRewriteRule(RangeVar *relation, const char *ruleName, DropBehavior behavior, bool missing_ok); extern void RemoveRewriteRuleById(Oid ruleOid); diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index e9ce0257ac..aba474d237 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -116,6 +116,10 @@ typedef union } SharedInvalidationMessage; +/* Counter of messages processed; don't worry about overflow. */ +extern uint64 SharedInvalidMessageCounter; + + extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n); extern void ReceiveSharedInvalidMessages( diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c index afd20b4872..d22fa68eb3 100644 --- a/src/pl/plpgsql/src/pl_comp.c +++ b/src/pl/plpgsql/src/pl_comp.c @@ -1708,7 +1708,8 @@ plpgsql_parse_cwordtype(List *idents) relvar = makeRangeVar(strVal(linitial(idents)), strVal(lsecond(idents)), -1); - classOid = RangeVarGetRelid(relvar, true); + /* Can't lock relation - we might not have privileges. */ + classOid = RangeVarGetRelid(relvar, NoLock, true, false); if (!OidIsValid(classOid)) goto done; fldname = strVal(lthird(idents)); @@ -1804,16 +1805,11 @@ plpgsql_parse_cwordrowtype(List *idents) /* Avoid memory leaks in long-term function context */ oldCxt = MemoryContextSwitchTo(compile_tmp_cxt); - /* Lookup the relation */ + /* Look up relation name. Can't lock it - we might not have privileges. */ relvar = makeRangeVar(strVal(linitial(idents)), strVal(lsecond(idents)), -1); - classOid = RangeVarGetRelid(relvar, true); - if (!OidIsValid(classOid)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_TABLE), - errmsg("relation \"%s.%s\" does not exist", - strVal(linitial(idents)), strVal(lsecond(idents))))); + classOid = RangeVarGetRelid(relvar, NoLock, false, false); MemoryContextSwitchTo(oldCxt);