diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c index cf5d08576a..dc10b4a483 100644 --- a/src/backend/utils/adt/acl.c +++ b/src/backend/utils/adt/acl.c @@ -36,6 +36,7 @@ #include "common/hashfn.h" #include "foreign/foreign.h" #include "funcapi.h" +#include "lib/bloomfilter.h" #include "lib/qunique.h" #include "miscadmin.h" #include "utils/acl.h" @@ -78,6 +79,13 @@ static Oid cached_role[] = {InvalidOid, InvalidOid, InvalidOid}; static List *cached_roles[] = {NIL, NIL, NIL}; static uint32 cached_db_hash; +/* + * If the list of roles gathered by roles_is_member_of() grows larger than the + * below threshold, a Bloom filter is created to speed up list membership + * checks. This threshold is set arbitrarily high to avoid the overhead of + * creating the Bloom filter until it seems likely to provide a net benefit. + */ +#define ROLES_LIST_BLOOM_THRESHOLD 1024 static const char *getid(const char *s, char *n, Node *escontext); static void putid(char *p, const char *s); @@ -4918,6 +4926,53 @@ RoleMembershipCacheCallback(Datum arg, int cacheid, uint32 hashvalue) cached_role[ROLERECURSE_SETROLE] = InvalidOid; } +/* + * A helper function for roles_is_member_of() that provides an optimized + * implementation of list_append_unique_oid() via a Bloom filter. The caller + * (i.e., roles_is_member_of()) is responsible for freeing bf once it is done + * using this function. + */ +static inline List * +roles_list_append(List *roles_list, bloom_filter **bf, Oid role) +{ + unsigned char *roleptr = (unsigned char *) &role; + + /* + * If there is a previously-created Bloom filter, use it to try to + * determine whether the role is missing from the list. If it says yes, + * that's a hard fact and we can go ahead and add the role. If it says + * no, that's only probabilistic and we'd better search the list. Without + * a filter, we must always do an ordinary linear search through the + * existing list. + */ + if ((*bf && bloom_lacks_element(*bf, roleptr, sizeof(Oid))) || + !list_member_oid(roles_list, role)) + { + /* + * If the list is large, we take on the overhead of creating and + * populating a Bloom filter to speed up future calls to this + * function. + */ + if (*bf == NULL && + list_length(roles_list) > ROLES_LIST_BLOOM_THRESHOLD) + { + *bf = bloom_create(ROLES_LIST_BLOOM_THRESHOLD * 10, work_mem, 0); + foreach_oid(roleid, roles_list) + bloom_add_element(*bf, (unsigned char *) &roleid, sizeof(Oid)); + } + + /* + * Finally, add the role to the list and the Bloom filter, if it + * exists. + */ + roles_list = lappend_oid(roles_list, role); + if (*bf) + bloom_add_element(*bf, roleptr, sizeof(Oid)); + } + + return roles_list; +} + /* * Get a list of roles that the specified roleid is a member of * @@ -4946,6 +5001,7 @@ roles_is_member_of(Oid roleid, enum RoleRecurseType type, ListCell *l; List *new_cached_roles; MemoryContext oldctx; + bloom_filter *bf = NULL; Assert(OidIsValid(admin_of) == PointerIsValid(admin_role)); if (admin_role != NULL) @@ -5023,16 +5079,22 @@ roles_is_member_of(Oid roleid, enum RoleRecurseType type, * graph, we must test for having already seen this role. It is * legal for instance to have both A->B and A->C->B. */ - roles_list = list_append_unique_oid(roles_list, otherid); + roles_list = roles_list_append(roles_list, &bf, otherid); } ReleaseSysCacheList(memlist); /* implement pg_database_owner implicit membership */ if (memberid == dba && OidIsValid(dba)) - roles_list = list_append_unique_oid(roles_list, - ROLE_PG_DATABASE_OWNER); + roles_list = roles_list_append(roles_list, &bf, + ROLE_PG_DATABASE_OWNER); } + /* + * Free the Bloom filter created by roles_list_append(), if there is one. + */ + if (bf) + bloom_free(bf); + /* * Copy the completed list into TopMemoryContext so it will persist. */