Lock before setting relhassubclass on RELKIND_PARTITIONED_INDEX.

Commit 5b562644fec696977df4a82790064e8287927891 added a comment that
SetRelationHasSubclass() callers must hold this lock.  When commit
17f206fbc824d2b4b14480199ca9ff7dea417eda extended use of this column to
partitioned indexes, it didn't take the lock.  As the latter commit
message mentioned, we currently never reset a partitioned index to
relhassubclass=f.  That largely avoids harm from the lock omission.  The
cause for fixing this now is to unblock introducing a rule about locks
required to heap_update() a pg_class row.  This might cause more
deadlocks.  It gives minor user-visible benefits:

- If an ALTER INDEX SET TABLESPACE runs concurrently with ALTER TABLE
  ATTACH PARTITION or CREATE PARTITION OF, one transaction blocks
  instead of failing with "tuple concurrently updated".  (Many cases of
  DDL concurrency still fail that way.)

- Match ALTER INDEX ATTACH PARTITION in choosing to lock the index.

While not user-visible today, we'll need this if we ever make something
set the flag to false for a partitioned index, like ANALYZE does today
for tables.  Back-patch to v12 (all supported versions), the plan for
the commit relying on the new rule.  In back branches, add
LockOrStrongerHeldByMe() instead of adding a LockHeldByMe() parameter.

Reviewed (in an earlier version) by Robert Haas.

Discussion: https://postgr.es/m/20240611024525.9f.nmisch@google.com
This commit is contained in:
Noah Misch 2024-06-27 19:21:05 -07:00
parent c3437fb725
commit 1a6d65b64e
7 changed files with 77 additions and 30 deletions

View File

@ -1010,6 +1010,7 @@ index_create(Relation heapRelation,
if (OidIsValid(parentIndexRelid))
{
StoreSingleInheritance(indexRelationId, parentIndexRelid, 1);
LockRelationOid(parentIndexRelid, ShareUpdateExclusiveLock);
SetRelationHasSubclass(parentIndexRelid, true);
}

View File

@ -3762,7 +3762,10 @@ IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
/* set relhassubclass if an index partition has been added to the parent */
if (OidIsValid(parentOid))
{
LockRelationOid(parentOid, ShareUpdateExclusiveLock);
SetRelationHasSubclass(parentOid, true);
}
/* set relispartition correctly on the partition */
update_relispartition(partRelid, OidIsValid(parentOid));

View File

@ -3017,8 +3017,15 @@ findAttrByName(const char *attributeName, List *schema)
* SetRelationHasSubclass
* Set the value of the relation's relhassubclass field in pg_class.
*
* NOTE: caller must be holding an appropriate lock on the relation.
* ShareUpdateExclusiveLock is sufficient.
* It's always safe to set this field to true, because all SQL commands are
* ready to see true and then find no children. On the other hand, commands
* generally assume zero children if this is false.
*
* Caller must hold any self-exclusive lock until end of transaction. If the
* new value is false, caller must have acquired that lock before reading the
* evidence that justified the false value. That way, it properly waits if
* another backend is simultaneously concluding no need to change the tuple
* (new and old values are true).
*
* NOTE: an important side-effect of this operation is that an SI invalidation
* message is sent out to all backends --- including me --- causing plans
@ -3033,6 +3040,11 @@ SetRelationHasSubclass(Oid relationId, bool relhassubclass)
HeapTuple tuple;
Form_pg_class classtuple;
Assert(CheckRelationOidLockedByMe(relationId,
ShareUpdateExclusiveLock, false) ||
CheckRelationOidLockedByMe(relationId,
ShareRowExclusiveLock, true));
/*
* Fetch a modifiable copy of the tuple, modify it, update pg_class.
*/

View File

@ -307,32 +307,26 @@ CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
if (LockHeldByMe(&tag, lockmode))
return true;
if (orstronger)
{
LOCKMODE slockmode;
for (slockmode = lockmode + 1;
slockmode <= MaxLockMode;
slockmode++)
{
if (LockHeldByMe(&tag, slockmode))
{
#ifdef NOT_USED
/* Sometimes this might be useful for debugging purposes */
elog(WARNING, "lock mode %s substituted for %s on relation %s",
GetLockmodeName(tag.locktag_lockmethodid, slockmode),
GetLockmodeName(tag.locktag_lockmethodid, lockmode),
RelationGetRelationName(relation));
#endif
return true;
}
}
return (orstronger ?
LockOrStrongerHeldByMe(&tag, lockmode) :
LockHeldByMe(&tag, lockmode));
}
return false;
/*
* CheckRelationOidLockedByMe
*
* Like the above, but takes an OID as argument.
*/
bool
CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
{
LOCKTAG tag;
SetLocktagRelationOid(&tag, relid);
return (orstronger ?
LockOrStrongerHeldByMe(&tag, lockmode) :
LockHeldByMe(&tag, lockmode));
}
/*

View File

@ -579,11 +579,17 @@ DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
}
/*
* LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode'
* by the current transaction
* LockHeldByMeExtended -- test whether lock 'locktag' is held by the current
* transaction
*
* Returns true if current transaction holds a lock on 'tag' of mode
* 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK.
* ("Stronger" is defined as "numerically higher", which is a bit
* semantically dubious but is OK for the purposes we use this for.)
*/
bool
LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
static bool
LockHeldByMeExtended(const LOCKTAG *locktag,
LOCKMODE lockmode, bool orstronger)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
@ -599,7 +605,35 @@ LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
(void *) &localtag,
HASH_FIND, NULL);
return (locallock && locallock->nLocks > 0);
if (locallock && locallock->nLocks > 0)
return true;
if (orstronger)
{
LOCKMODE slockmode;
for (slockmode = lockmode + 1;
slockmode <= MaxLockMode;
slockmode++)
{
if (LockHeldByMeExtended(locktag, slockmode, false))
return true;
}
}
return false;
}
bool
LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
{
return LockHeldByMeExtended(locktag, lockmode, false);
}
bool
LockOrStrongerHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
{
return LockHeldByMeExtended(locktag, lockmode, true);
}
#ifdef USE_ASSERT_CHECKING

View File

@ -47,6 +47,8 @@ extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
extern bool CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode,
bool orstronger);
extern bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode,
bool orstronger);
extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);

View File

@ -558,6 +558,7 @@ extern void LockReleaseSession(LOCKMETHODID lockmethodid);
extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
extern bool LockOrStrongerHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
#ifdef USE_ASSERT_CHECKING
extern HTAB *GetLockMethodLocalHash(void);
#endif