diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 55fdde4b24..a819b4197c 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1058,6 +1058,7 @@ index_create(Relation heapRelation, if (OidIsValid(parentIndexRelid)) { StoreSingleInheritance(indexRelationId, parentIndexRelid, 1); + LockRelationOid(parentIndexRelid, ShareUpdateExclusiveLock); SetRelationHasSubclass(parentIndexRelid, true); } diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 309389e20d..2caab88aa5 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -4355,7 +4355,10 @@ IndexSetParentIndex(Relation partitionIdx, Oid parentOid) /* set relhassubclass if an index partition has been added to the parent */ if (OidIsValid(parentOid)) + { + LockRelationOid(parentOid, ShareUpdateExclusiveLock); SetRelationHasSubclass(parentOid, true); + } /* set relispartition correctly on the partition */ update_relispartition(partRelid, OidIsValid(parentOid)); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 66cda26a25..8fcb188323 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -3483,8 +3483,15 @@ findAttrByName(const char *attributeName, const List *columns) * SetRelationHasSubclass * Set the value of the relation's relhassubclass field in pg_class. * - * NOTE: caller must be holding an appropriate lock on the relation. - * ShareUpdateExclusiveLock is sufficient. + * It's always safe to set this field to true, because all SQL commands are + * ready to see true and then find no children. On the other hand, commands + * generally assume zero children if this is false. + * + * Caller must hold any self-exclusive lock until end of transaction. If the + * new value is false, caller must have acquired that lock before reading the + * evidence that justified the false value. That way, it properly waits if + * another backend is simultaneously concluding no need to change the tuple + * (new and old values are true). * * NOTE: an important side-effect of this operation is that an SI invalidation * message is sent out to all backends --- including me --- causing plans @@ -3499,6 +3506,11 @@ SetRelationHasSubclass(Oid relationId, bool relhassubclass) HeapTuple tuple; Form_pg_class classtuple; + Assert(CheckRelationOidLockedByMe(relationId, + ShareUpdateExclusiveLock, false) || + CheckRelationOidLockedByMe(relationId, + ShareRowExclusiveLock, true)); + /* * Fetch a modifiable copy of the tuple, modify it, update pg_class. */ diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index fe3cda2f88..094522acb4 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -335,32 +335,22 @@ CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger) relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.relId); - if (LockHeldByMe(&tag, lockmode)) - return true; + return LockHeldByMe(&tag, lockmode, orstronger); +} - if (orstronger) - { - LOCKMODE slockmode; +/* + * CheckRelationOidLockedByMe + * + * Like the above, but takes an OID as argument. + */ +bool +CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger) +{ + LOCKTAG tag; - for (slockmode = lockmode + 1; - slockmode <= MaxLockMode; - slockmode++) - { - if (LockHeldByMe(&tag, slockmode)) - { -#ifdef NOT_USED - /* Sometimes this might be useful for debugging purposes */ - elog(WARNING, "lock mode %s substituted for %s on relation %s", - GetLockmodeName(tag.locktag_lockmethodid, slockmode), - GetLockmodeName(tag.locktag_lockmethodid, lockmode), - RelationGetRelationName(relation)); -#endif - return true; - } - } - } + SetLocktagRelationOid(&tag, relid); - return false; + return LockHeldByMe(&tag, lockmode, orstronger); } /* diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index f68c595c8a..0400a50777 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -578,11 +578,17 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2) } /* - * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode' - * by the current transaction + * LockHeldByMe -- test whether lock 'locktag' is held by the current + * transaction + * + * Returns true if current transaction holds a lock on 'tag' of mode + * 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK. + * ("Stronger" is defined as "numerically higher", which is a bit + * semantically dubious but is OK for the purposes we use this for.) */ bool -LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) +LockHeldByMe(const LOCKTAG *locktag, + LOCKMODE lockmode, bool orstronger) { LOCALLOCKTAG localtag; LOCALLOCK *locallock; @@ -598,7 +604,23 @@ LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) &localtag, HASH_FIND, NULL); - return (locallock && locallock->nLocks > 0); + if (locallock && locallock->nLocks > 0) + return true; + + if (orstronger) + { + LOCKMODE slockmode; + + for (slockmode = lockmode + 1; + slockmode <= MaxLockMode; + slockmode++) + { + if (LockHeldByMe(locktag, slockmode, false)) + return true; + } + } + + return false; } #ifdef USE_ASSERT_CHECKING diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index 22b7856ef1..ce15125ac3 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -48,6 +48,8 @@ extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode); extern void UnlockRelation(Relation relation, LOCKMODE lockmode); extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger); +extern bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, + bool orstronger); extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode); extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 0017d4b868..cc1f6e78c3 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -567,7 +567,8 @@ extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks); extern void LockReleaseSession(LOCKMETHODID lockmethodid); extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks); extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks); -extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode); +extern bool LockHeldByMe(const LOCKTAG *locktag, + LOCKMODE lockmode, bool orstronger); #ifdef USE_ASSERT_CHECKING extern HTAB *GetLockMethodLocalHash(void); #endif