From 86db52a5062a77c6ee533586be2cef8672a20c7d Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 15 Jul 2024 10:21:16 +0300 Subject: [PATCH] Use atomics to avoid locking in InjectionPointRun() This allows using injection points without having a PGPROC, like early at backend startup, or in the postmaster. The injection points facility is new in v17, so backpatch there. Reviewed-by: Michael Paquier Disussion: https://www.postgresql.org/message-id/4317a7f7-8d24-435e-9e49-29b72a3dc418@iki.fi --- src/backend/utils/misc/injection_point.c | 408 ++++++++++++++++------- src/tools/pgindent/typedefs.list | 1 + 2 files changed, 283 insertions(+), 126 deletions(-) diff --git a/src/backend/utils/misc/injection_point.c b/src/backend/utils/misc/injection_point.c index 48f29e9b60..84ad5e470d 100644 --- a/src/backend/utils/misc/injection_point.c +++ b/src/backend/utils/misc/injection_point.c @@ -21,7 +21,6 @@ #include "fmgr.h" #include "miscadmin.h" -#include "port/pg_bitutils.h" #include "storage/fd.h" #include "storage/lwlock.h" #include "storage/shmem.h" @@ -31,22 +30,35 @@ #ifdef USE_INJECTION_POINTS -/* - * Hash table for storing injection points. - * - * InjectionPointHash is used to find an injection point by name. - */ -static HTAB *InjectionPointHash; /* find points from names */ - /* Field sizes */ #define INJ_NAME_MAXLEN 64 #define INJ_LIB_MAXLEN 128 #define INJ_FUNC_MAXLEN 128 #define INJ_PRIVATE_MAXLEN 1024 -/* Single injection point stored in InjectionPointHash */ +/* Single injection point stored in shared memory */ typedef struct InjectionPointEntry { + /* + * Because injection points need to be usable without LWLocks, we use a + * generation counter on each entry to allow safe, lock-free reading. + * + * To read an entry, first read the current 'generation' value. If it's + * even, then the slot is currently unused, and odd means it's in use. + * When reading the other fields, beware that they may change while + * reading them, if the entry is released and reused! After reading the + * other fields, read 'generation' again: if its value hasn't changed, you + * can be certain that the other fields you read are valid. Otherwise, + * the slot was concurrently recycled, and you should ignore it. + * + * When adding an entry, you must store all the other fields first, and + * then update the generation number, with an appropriate memory barrier + * in between. In addition to that protocol, you must also hold + * InjectionPointLock, to prevent two backends from modifying the array at + * the same time. + */ + pg_atomic_uint64 generation; + char name[INJ_NAME_MAXLEN]; /* hash key */ char library[INJ_LIB_MAXLEN]; /* library */ char function[INJ_FUNC_MAXLEN]; /* function */ @@ -58,8 +70,22 @@ typedef struct InjectionPointEntry char private_data[INJ_PRIVATE_MAXLEN]; } InjectionPointEntry; -#define INJECTION_POINT_HASH_INIT_SIZE 16 -#define INJECTION_POINT_HASH_MAX_SIZE 128 +#define MAX_INJECTION_POINTS 128 + +/* + * Shared memory array of active injection points. + * + * 'max_inuse' is the highest index currently in use, plus one. It's just an + * optimization to avoid scanning through the whole entry, in the common case + * that there are no injection points, or only a few. + */ +typedef struct InjectionPointsCtl +{ + pg_atomic_uint32 max_inuse; + InjectionPointEntry entries[MAX_INJECTION_POINTS]; +} InjectionPointsCtl; + +static InjectionPointsCtl *ActiveInjectionPoints; /* * Backend local cache of injection callbacks already loaded, stored in @@ -70,6 +96,14 @@ typedef struct InjectionPointCacheEntry char name[INJ_NAME_MAXLEN]; char private_data[INJ_PRIVATE_MAXLEN]; InjectionPointCallback callback; + + /* + * Shmem slot and copy of its generation number when this cache entry was + * created. They can be used to validate if the cached entry is still + * valid. + */ + int slot_idx; + uint64 generation; } InjectionPointCacheEntry; static HTAB *InjectionPointCache = NULL; @@ -79,8 +113,10 @@ static HTAB *InjectionPointCache = NULL; * * Add an injection point to the local cache. */ -static void +static InjectionPointCacheEntry * injection_point_cache_add(const char *name, + int slot_idx, + uint64 generation, InjectionPointCallback callback, const void *private_data) { @@ -97,7 +133,7 @@ injection_point_cache_add(const char *name, hash_ctl.hcxt = TopMemoryContext; InjectionPointCache = hash_create("InjectionPoint cache hash", - INJECTION_POINT_HASH_MAX_SIZE, + MAX_INJECTION_POINTS, &hash_ctl, HASH_ELEM | HASH_STRINGS | HASH_CONTEXT); } @@ -107,9 +143,12 @@ injection_point_cache_add(const char *name, Assert(!found); strlcpy(entry->name, name, sizeof(entry->name)); + entry->slot_idx = slot_idx; + entry->generation = generation; entry->callback = callback; - if (private_data != NULL) - memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN); + memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN); + + return entry; } /* @@ -122,11 +161,10 @@ injection_point_cache_add(const char *name, static void injection_point_cache_remove(const char *name) { - /* leave if no cache */ - if (InjectionPointCache == NULL) - return; + bool found PG_USED_FOR_ASSERTS_ONLY; - (void) hash_search(InjectionPointCache, name, HASH_REMOVE, NULL); + (void) hash_search(InjectionPointCache, name, HASH_REMOVE, &found); + Assert(found); } /* @@ -134,29 +172,32 @@ injection_point_cache_remove(const char *name) * * Load an injection point into the local cache. */ -static void -injection_point_cache_load(InjectionPointEntry *entry_by_name) +static InjectionPointCacheEntry * +injection_point_cache_load(InjectionPointEntry *entry, int slot_idx, uint64 generation) { char path[MAXPGPATH]; void *injection_callback_local; snprintf(path, MAXPGPATH, "%s/%s%s", pkglib_path, - entry_by_name->library, DLSUFFIX); + entry->library, DLSUFFIX); if (!pg_file_exists(path)) elog(ERROR, "could not find library \"%s\" for injection point \"%s\"", - path, entry_by_name->name); + path, entry->name); injection_callback_local = (void *) - load_external_function(path, entry_by_name->function, false, NULL); + load_external_function(path, entry->function, false, NULL); if (injection_callback_local == NULL) elog(ERROR, "could not find function \"%s\" in library \"%s\" for injection point \"%s\"", - entry_by_name->function, path, entry_by_name->name); + entry->function, path, entry->name); - /* add it to the local cache when found */ - injection_point_cache_add(entry_by_name->name, injection_callback_local, - entry_by_name->private_data); + /* add it to the local cache */ + return injection_point_cache_add(entry->name, + slot_idx, + generation, + injection_callback_local, + entry->private_data); } /* @@ -193,8 +234,7 @@ InjectionPointShmemSize(void) #ifdef USE_INJECTION_POINTS Size sz = 0; - sz = add_size(sz, hash_estimate_size(INJECTION_POINT_HASH_MAX_SIZE, - sizeof(InjectionPointEntry))); + sz = add_size(sz, sizeof(InjectionPointsCtl)); return sz; #else return 0; @@ -208,16 +248,20 @@ void InjectionPointShmemInit(void) { #ifdef USE_INJECTION_POINTS - HASHCTL info; + bool found; - /* key is a NULL-terminated string */ - info.keysize = sizeof(char[INJ_NAME_MAXLEN]); - info.entrysize = sizeof(InjectionPointEntry); - InjectionPointHash = ShmemInitHash("InjectionPoint hash", - INJECTION_POINT_HASH_INIT_SIZE, - INJECTION_POINT_HASH_MAX_SIZE, - &info, - HASH_ELEM | HASH_FIXED_SIZE | HASH_STRINGS); + ActiveInjectionPoints = ShmemInitStruct("InjectionPoint hash", + sizeof(InjectionPointsCtl), + &found); + if (!IsUnderPostmaster) + { + Assert(!found); + pg_atomic_init_u32(&ActiveInjectionPoints->max_inuse, 0); + for (int i = 0; i < MAX_INJECTION_POINTS; i++) + pg_atomic_init_u64(&ActiveInjectionPoints->entries[i].generation, 0); + } + else + Assert(found); #endif } @@ -232,8 +276,10 @@ InjectionPointAttach(const char *name, int private_data_size) { #ifdef USE_INJECTION_POINTS - InjectionPointEntry *entry_by_name; - bool found; + InjectionPointEntry *entry; + uint64 generation; + uint32 max_inuse; + int free_idx; if (strlen(name) >= INJ_NAME_MAXLEN) elog(ERROR, "injection point name %s too long (maximum of %u)", @@ -253,21 +299,51 @@ InjectionPointAttach(const char *name, * exist. For testing purposes this should be fine. */ LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE); - entry_by_name = (InjectionPointEntry *) - hash_search(InjectionPointHash, name, - HASH_ENTER, &found); - if (found) - elog(ERROR, "injection point \"%s\" already defined", name); + max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse); + free_idx = -1; + + for (int idx = 0; idx < max_inuse; idx++) + { + entry = &ActiveInjectionPoints->entries[idx]; + generation = pg_atomic_read_u64(&entry->generation); + if (generation % 2 == 0) + { + /* + * Found a free slot where we can add the new entry, but keep + * going so that we will find out if the entry already exists. + */ + if (free_idx == -1) + free_idx = idx; + } + + if (strcmp(entry->name, name) == 0) + elog(ERROR, "injection point \"%s\" already defined", name); + } + if (free_idx == -1) + { + if (max_inuse == MAX_INJECTION_POINTS) + elog(ERROR, "too many injection points"); + free_idx = max_inuse; + } + entry = &ActiveInjectionPoints->entries[free_idx]; + generation = pg_atomic_read_u64(&entry->generation); + Assert(generation % 2 == 0); /* Save the entry */ - strlcpy(entry_by_name->name, name, sizeof(entry_by_name->name)); - entry_by_name->name[INJ_NAME_MAXLEN - 1] = '\0'; - strlcpy(entry_by_name->library, library, sizeof(entry_by_name->library)); - entry_by_name->library[INJ_LIB_MAXLEN - 1] = '\0'; - strlcpy(entry_by_name->function, function, sizeof(entry_by_name->function)); - entry_by_name->function[INJ_FUNC_MAXLEN - 1] = '\0'; + strlcpy(entry->name, name, sizeof(entry->name)); + entry->name[INJ_NAME_MAXLEN - 1] = '\0'; + strlcpy(entry->library, library, sizeof(entry->library)); + entry->library[INJ_LIB_MAXLEN - 1] = '\0'; + strlcpy(entry->function, function, sizeof(entry->function)); + entry->function[INJ_FUNC_MAXLEN - 1] = '\0'; if (private_data != NULL) - memcpy(entry_by_name->private_data, private_data, private_data_size); + memcpy(entry->private_data, private_data, private_data_size); + + pg_write_barrier(); + pg_atomic_write_u64(&entry->generation, generation + 1); + + if (free_idx + 1 > max_inuse) + pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, free_idx + 1); LWLockRelease(InjectionPointLock); @@ -285,22 +361,165 @@ bool InjectionPointDetach(const char *name) { #ifdef USE_INJECTION_POINTS - bool found; + bool found = false; + int idx; + int max_inuse; LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE); - hash_search(InjectionPointHash, name, HASH_REMOVE, &found); + + /* Find it in the shmem array, and mark the slot as unused */ + max_inuse = (int) pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse); + for (idx = max_inuse - 1; idx >= 0; --idx) + { + InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx]; + uint64 generation; + + generation = pg_atomic_read_u64(&entry->generation); + if (generation % 2 == 0) + continue; /* empty slot */ + + if (strcmp(entry->name, name) == 0) + { + Assert(!found); + found = true; + pg_atomic_write_u64(&entry->generation, generation + 1); + break; + } + } + + /* If we just removed the highest-numbered entry, update 'max_inuse' */ + if (found && idx == max_inuse - 1) + { + for (; idx >= 0; --idx) + { + InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx]; + uint64 generation; + + generation = pg_atomic_read_u64(&entry->generation); + if (generation % 2 != 0) + break; + } + pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, idx + 1); + } LWLockRelease(InjectionPointLock); - if (!found) - return false; - - return true; + return found; #else elog(ERROR, "Injection points are not supported by this build"); return true; /* silence compiler */ #endif } +#ifdef USE_INJECTION_POINTS +/* + * Common workhorse of InjectionPointRun() and InjectionPointLoad() + * + * Checks if an injection point exists in shared memory, and update + * the local cache entry accordingly. + */ +static InjectionPointCacheEntry * +InjectionPointCacheRefresh(const char *name) +{ + uint32 max_inuse; + int namelen; + InjectionPointEntry local_copy; + InjectionPointCacheEntry *cached; + + /* + * First read the number of in-use slots. More entries can be added or + * existing ones can be removed while we're reading them. If the entry + * we're looking for is concurrently added or removed, we might or might + * not see it. That's OK. + */ + max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse); + if (max_inuse == 0) + { + if (InjectionPointCache) + { + hash_destroy(InjectionPointCache); + InjectionPointCache = NULL; + } + return NULL; + } + + /* + * If we have this entry in the local cache already, check if the cached + * entry is still valid. + */ + cached = injection_point_cache_get(name); + if (cached) + { + int idx = cached->slot_idx; + InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx]; + + if (pg_atomic_read_u64(&entry->generation) == cached->generation) + { + /* still good */ + return cached; + } + injection_point_cache_remove(name); + cached = NULL; + } + + /* + * Search the shared memory array. + * + * It's possible that the entry we're looking for is concurrently detached + * or attached. Or detached *and* re-attached, to the same slot or a + * different slot. Detach and re-attach is not an atomic operation, so + * it's OK for us to return the old value, NULL, or the new value in such + * cases. + */ + namelen = strlen(name); + for (int idx = 0; idx < max_inuse; idx++) + { + InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx]; + uint64 generation; + + /* + * Read the generation number so that we can detect concurrent + * modifications. The read barrier ensures that the generation number + * is loaded before any of the other fields. + */ + generation = pg_atomic_read_u64(&entry->generation); + if (generation % 2 == 0) + continue; /* empty slot */ + pg_read_barrier(); + + /* Is this the injection point we're looking for? */ + if (memcmp(entry->name, name, namelen + 1) != 0) + continue; + + /* + * The entry can change at any time, if the injection point is + * concurrently detached. Copy it to local memory, and re-check the + * generation. If the generation hasn't changed, we know our local + * copy is coherent. + */ + memcpy(&local_copy, entry, sizeof(InjectionPointEntry)); + + pg_read_barrier(); + if (pg_atomic_read_u64(&entry->generation) != generation) + { + /* + * The entry was concurrently detached. + * + * Continue the search, because if the generation number changed, + * we cannot trust the result of the name comparison we did above. + * It's theoretically possible that it falsely matched a mixed-up + * state of the old and new name, if the slot was recycled with a + * different name. + */ + continue; + } + + /* Success! Load it into the cache and return it */ + return injection_point_cache_load(&local_copy, idx, generation); + } + return NULL; +} +#endif + /* * Load an injection point into the local cache. * @@ -312,36 +531,7 @@ void InjectionPointLoad(const char *name) { #ifdef USE_INJECTION_POINTS - InjectionPointEntry *entry_by_name; - bool found; - - LWLockAcquire(InjectionPointLock, LW_SHARED); - entry_by_name = (InjectionPointEntry *) - hash_search(InjectionPointHash, name, - HASH_FIND, &found); - - /* - * If not found, do nothing and remove it from the local cache if it - * existed there. - */ - if (!found) - { - injection_point_cache_remove(name); - LWLockRelease(InjectionPointLock); - return; - } - - /* Check first the local cache, and leave if this entry exists. */ - if (injection_point_cache_get(name) != NULL) - { - LWLockRelease(InjectionPointLock); - return; - } - - /* Nothing? Then load it and leave */ - injection_point_cache_load(entry_by_name); - - LWLockRelease(InjectionPointLock); + InjectionPointCacheRefresh(name); #else elog(ERROR, "Injection points are not supported by this build"); #endif @@ -349,50 +539,16 @@ InjectionPointLoad(const char *name) /* * Execute an injection point, if defined. - * - * Check first the shared hash table, and adapt the local cache depending - * on that as it could be possible that an entry to run has been removed. */ void InjectionPointRun(const char *name) { #ifdef USE_INJECTION_POINTS - InjectionPointEntry *entry_by_name; - bool found; InjectionPointCacheEntry *cache_entry; - LWLockAcquire(InjectionPointLock, LW_SHARED); - entry_by_name = (InjectionPointEntry *) - hash_search(InjectionPointHash, name, - HASH_FIND, &found); - - /* - * If not found, do nothing and remove it from the local cache if it - * existed there. - */ - if (!found) - { - injection_point_cache_remove(name); - LWLockRelease(InjectionPointLock); - return; - } - - /* - * Check if the callback exists in the local cache, to avoid unnecessary - * external loads. - */ - if (injection_point_cache_get(name) == NULL) - { - /* not found in local cache, so load and register it */ - injection_point_cache_load(entry_by_name); - } - - /* Now loaded, so get it. */ - cache_entry = injection_point_cache_get(name); - - LWLockRelease(InjectionPointLock); - - cache_entry->callback(name, cache_entry->private_data); + cache_entry = InjectionPointCacheRefresh(name); + if (cache_entry) + cache_entry->callback(name, cache_entry->private_data); #else elog(ERROR, "Injection points are not supported by this build"); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 635e6d6e21..b4d7f9217c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1239,6 +1239,7 @@ InjectionPointCallback InjectionPointCondition InjectionPointConditionType InjectionPointEntry +InjectionPointsCtl InjectionPointSharedState InlineCodeBlock InsertStmt