diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index f5f8864445..5ac1b3237d 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1010,6 +1010,7 @@ index_create(Relation heapRelation, if (OidIsValid(parentIndexRelid)) { StoreSingleInheritance(indexRelationId, parentIndexRelid, 1); + LockRelationOid(parentIndexRelid, ShareUpdateExclusiveLock); SetRelationHasSubclass(parentIndexRelid, true); } diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 4a7cfb3ce4..9d208717c6 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -3762,7 +3762,10 @@ IndexSetParentIndex(Relation partitionIdx, Oid parentOid) /* set relhassubclass if an index partition has been added to the parent */ if (OidIsValid(parentOid)) + { + LockRelationOid(parentOid, ShareUpdateExclusiveLock); SetRelationHasSubclass(parentOid, true); + } /* set relispartition correctly on the partition */ update_relispartition(partRelid, OidIsValid(parentOid)); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 7ab3a51136..0866a680d7 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -3017,8 +3017,15 @@ findAttrByName(const char *attributeName, List *schema) * SetRelationHasSubclass * Set the value of the relation's relhassubclass field in pg_class. * - * NOTE: caller must be holding an appropriate lock on the relation. - * ShareUpdateExclusiveLock is sufficient. + * It's always safe to set this field to true, because all SQL commands are + * ready to see true and then find no children. On the other hand, commands + * generally assume zero children if this is false. + * + * Caller must hold any self-exclusive lock until end of transaction. If the + * new value is false, caller must have acquired that lock before reading the + * evidence that justified the false value. That way, it properly waits if + * another backend is simultaneously concluding no need to change the tuple + * (new and old values are true). * * NOTE: an important side-effect of this operation is that an SI invalidation * message is sent out to all backends --- including me --- causing plans @@ -3033,6 +3040,11 @@ SetRelationHasSubclass(Oid relationId, bool relhassubclass) HeapTuple tuple; Form_pg_class classtuple; + Assert(CheckRelationOidLockedByMe(relationId, + ShareUpdateExclusiveLock, false) || + CheckRelationOidLockedByMe(relationId, + ShareRowExclusiveLock, true)); + /* * Fetch a modifiable copy of the tuple, modify it, update pg_class. */ diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index d4fa5f6ebc..79068795af 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -307,32 +307,26 @@ CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger) relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.relId); - if (LockHeldByMe(&tag, lockmode)) - return true; + return (orstronger ? + LockOrStrongerHeldByMe(&tag, lockmode) : + LockHeldByMe(&tag, lockmode)); +} - if (orstronger) - { - LOCKMODE slockmode; +/* + * CheckRelationOidLockedByMe + * + * Like the above, but takes an OID as argument. + */ +bool +CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger) +{ + LOCKTAG tag; - for (slockmode = lockmode + 1; - slockmode <= MaxLockMode; - slockmode++) - { - if (LockHeldByMe(&tag, slockmode)) - { -#ifdef NOT_USED - /* Sometimes this might be useful for debugging purposes */ - elog(WARNING, "lock mode %s substituted for %s on relation %s", - GetLockmodeName(tag.locktag_lockmethodid, slockmode), - GetLockmodeName(tag.locktag_lockmethodid, lockmode), - RelationGetRelationName(relation)); -#endif - return true; - } - } - } + SetLocktagRelationOid(&tag, relid); - return false; + return (orstronger ? + LockOrStrongerHeldByMe(&tag, lockmode) : + LockHeldByMe(&tag, lockmode)); } /* diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 31933693d6..bc888d2108 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -579,11 +579,17 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2) } /* - * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode' - * by the current transaction + * LockHeldByMeExtended -- test whether lock 'locktag' is held by the current + * transaction + * + * Returns true if current transaction holds a lock on 'tag' of mode + * 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK. + * ("Stronger" is defined as "numerically higher", which is a bit + * semantically dubious but is OK for the purposes we use this for.) */ -bool -LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) +static bool +LockHeldByMeExtended(const LOCKTAG *locktag, + LOCKMODE lockmode, bool orstronger) { LOCALLOCKTAG localtag; LOCALLOCK *locallock; @@ -599,7 +605,35 @@ LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) (void *) &localtag, HASH_FIND, NULL); - return (locallock && locallock->nLocks > 0); + if (locallock && locallock->nLocks > 0) + return true; + + if (orstronger) + { + LOCKMODE slockmode; + + for (slockmode = lockmode + 1; + slockmode <= MaxLockMode; + slockmode++) + { + if (LockHeldByMeExtended(locktag, slockmode, false)) + return true; + } + } + + return false; +} + +bool +LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) +{ + return LockHeldByMeExtended(locktag, lockmode, false); +} + +bool +LockOrStrongerHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) +{ + return LockHeldByMeExtended(locktag, lockmode, true); } #ifdef USE_ASSERT_CHECKING diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index ec5ef2b878..7023d7fb78 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -47,6 +47,8 @@ extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode); extern void UnlockRelation(Relation relation, LOCKMODE lockmode); extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger); +extern bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, + bool orstronger); extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode); extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index ab42f6b080..6e10a6a424 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -558,6 +558,7 @@ extern void LockReleaseSession(LOCKMETHODID lockmethodid); extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks); extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks); extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode); +extern bool LockOrStrongerHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode); #ifdef USE_ASSERT_CHECKING extern HTAB *GetLockMethodLocalHash(void); #endif