diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index 74007856f3..8fabe6899f 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -986,93 +986,35 @@ check_functional_grouping(Oid relid, List *grouping_columns, List **constraintDeps) { - bool result = false; - Relation pg_constraint; - HeapTuple tuple; - SysScanDesc scan; - ScanKeyData skey[1]; + Bitmapset *pkattnos; + Bitmapset *groupbyattnos; + Oid constraintOid; + ListCell *gl; - /* Scan pg_constraint for constraints of the target rel */ - pg_constraint = heap_open(ConstraintRelationId, AccessShareLock); + /* If the rel has no PK, then we can't prove functional dependency */ + pkattnos = get_primary_key_attnos(relid, false, &constraintOid); + if (pkattnos == NULL) + return false; - ScanKeyInit(&skey[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(relid)); - - scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true, - NULL, 1, skey); - - while (HeapTupleIsValid(tuple = systable_getnext(scan))) + /* Identify all the rel's columns that appear in grouping_columns */ + groupbyattnos = NULL; + foreach(gl, grouping_columns) { - Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple); - Datum adatum; - bool isNull; - ArrayType *arr; - int16 *attnums; - int numkeys; - int i; - bool found_col; + Var *gvar = (Var *) lfirst(gl); - /* Only PK constraints are of interest for now, see comment above */ - if (con->contype != CONSTRAINT_PRIMARY) - continue; - /* Constraint must be non-deferrable */ - if (con->condeferrable) - continue; - - /* Extract the conkey array, ie, attnums of PK's columns */ - adatum = heap_getattr(tuple, Anum_pg_constraint_conkey, - RelationGetDescr(pg_constraint), &isNull); - if (isNull) - elog(ERROR, "null conkey for constraint %u", - HeapTupleGetOid(tuple)); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - numkeys = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - numkeys < 0 || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "conkey is not a 1-D smallint array"); - attnums = (int16 *) ARR_DATA_PTR(arr); - - found_col = false; - for (i = 0; i < numkeys; i++) - { - AttrNumber attnum = attnums[i]; - ListCell *gl; - - found_col = false; - foreach(gl, grouping_columns) - { - Var *gvar = (Var *) lfirst(gl); - - if (IsA(gvar, Var) && - gvar->varno == varno && - gvar->varlevelsup == varlevelsup && - gvar->varattno == attnum) - { - found_col = true; - break; - } - } - if (!found_col) - break; - } - - if (found_col) - { - /* The PK is a subset of grouping_columns, so we win */ - *constraintDeps = lappend_oid(*constraintDeps, - HeapTupleGetOid(tuple)); - result = true; - break; - } + if (IsA(gvar, Var) && + gvar->varno == varno && + gvar->varlevelsup == varlevelsup) + groupbyattnos = bms_add_member(groupbyattnos, + gvar->varattno - FirstLowInvalidHeapAttributeNumber); } - systable_endscan(scan); + if (bms_is_subset(pkattnos, groupbyattnos)) + { + /* The PK is a subset of grouping_columns, so we win */ + *constraintDeps = lappend_oid(*constraintDeps, constraintOid); + return true; + } - heap_close(pg_constraint, AccessShareLock); - - return result; + return false; }