mirror of https://github.com/postgres/postgres
injection_points: Add wait and wakeup of processes
This commit adds two features to the in-core module for injection points: - A new callback called "wait" that can be attached to an injection point to make it wait. - A new SQL function to update the shared state and broadcast the update using a condition variable. This function uses an input an injection point name. This offers the possibility to stop a process in flight and wake it up in a controlled manner, which is useful when implementing tests that aim to trigger scenarios for race conditions (some tests are planned for integration). The logic uses a set of counters with a condition variable to monitor and broadcast the changes. Up to 8 waits can be registered in a single run, which should be plenty enough. Waits can be monitored in pg_stat_activity, based on the injection point name which is registered in a custom wait event under the "Extension" category. The shared memory state used by the module is registered using the DSM registry, and is optional, so there is no need to load the module with shared_preload_libraries to be able to use these features. Author: Michael Paquier Reviewed-by: Andrey Borodin, Bertrand Drouvot Discussion: https://postgr.es/m/ZdLuxBk5hGpol91B@paquier.xyz
This commit is contained in:
parent
024c521117
commit
37b369dc67
|
@ -24,6 +24,16 @@ RETURNS void
|
|||
AS 'MODULE_PATHNAME', 'injection_points_run'
|
||||
LANGUAGE C STRICT PARALLEL UNSAFE;
|
||||
|
||||
--
|
||||
-- injection_points_wakeup()
|
||||
--
|
||||
-- Wakes up a waiting injection point.
|
||||
--
|
||||
CREATE FUNCTION injection_points_wakeup(IN point_name TEXT)
|
||||
RETURNS void
|
||||
AS 'MODULE_PATHNAME', 'injection_points_wakeup'
|
||||
LANGUAGE C STRICT PARALLEL UNSAFE;
|
||||
|
||||
--
|
||||
-- injection_points_detach()
|
||||
--
|
||||
|
|
|
@ -18,18 +18,75 @@
|
|||
#include "postgres.h"
|
||||
|
||||
#include "fmgr.h"
|
||||
#include "storage/condition_variable.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/dsm_registry.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/injection_point.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
/* Maximum number of waits usable in injection points at once */
|
||||
#define INJ_MAX_WAIT 8
|
||||
#define INJ_NAME_MAXLEN 64
|
||||
|
||||
/* Shared state information for injection points. */
|
||||
typedef struct InjectionPointSharedState
|
||||
{
|
||||
/* Protects access to other fields */
|
||||
slock_t lock;
|
||||
|
||||
/* Counters advancing when injection_points_wakeup() is called */
|
||||
uint32 wait_counts[INJ_MAX_WAIT];
|
||||
|
||||
/* Names of injection points attached to wait counters */
|
||||
char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
|
||||
|
||||
/* Condition variable used for waits and wakeups */
|
||||
ConditionVariable wait_point;
|
||||
} InjectionPointSharedState;
|
||||
|
||||
/* Pointer to shared-memory state. */
|
||||
static InjectionPointSharedState *inj_state = NULL;
|
||||
|
||||
extern PGDLLEXPORT void injection_error(const char *name);
|
||||
extern PGDLLEXPORT void injection_notice(const char *name);
|
||||
extern PGDLLEXPORT void injection_wait(const char *name);
|
||||
|
||||
|
||||
/*
|
||||
* Callback for shared memory area initialization.
|
||||
*/
|
||||
static void
|
||||
injection_point_init_state(void *ptr)
|
||||
{
|
||||
InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
|
||||
|
||||
SpinLockInit(&state->lock);
|
||||
memset(state->wait_counts, 0, sizeof(state->wait_counts));
|
||||
memset(state->name, 0, sizeof(state->name));
|
||||
ConditionVariableInit(&state->wait_point);
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize shared memory area for this module.
|
||||
*/
|
||||
static void
|
||||
injection_init_shmem(void)
|
||||
{
|
||||
bool found;
|
||||
|
||||
if (inj_state != NULL)
|
||||
return;
|
||||
|
||||
inj_state = GetNamedDSMSegment("injection_points",
|
||||
sizeof(InjectionPointSharedState),
|
||||
injection_point_init_state,
|
||||
&found);
|
||||
}
|
||||
|
||||
/* Set of callbacks available to be attached to an injection point. */
|
||||
void
|
||||
injection_error(const char *name)
|
||||
|
@ -43,6 +100,66 @@ injection_notice(const char *name)
|
|||
elog(NOTICE, "notice triggered for injection point %s", name);
|
||||
}
|
||||
|
||||
/* Wait on a condition variable, awaken by injection_points_wakeup() */
|
||||
void
|
||||
injection_wait(const char *name)
|
||||
{
|
||||
uint32 old_wait_counts = 0;
|
||||
int index = -1;
|
||||
uint32 injection_wait_event = 0;
|
||||
|
||||
if (inj_state == NULL)
|
||||
injection_init_shmem();
|
||||
|
||||
/*
|
||||
* Use the injection point name for this custom wait event. Note that
|
||||
* this custom wait event name is not released, but we don't care much for
|
||||
* testing as this should be short-lived.
|
||||
*/
|
||||
injection_wait_event = WaitEventExtensionNew(name);
|
||||
|
||||
/*
|
||||
* Find a free slot to wait for, and register this injection point's name.
|
||||
*/
|
||||
SpinLockAcquire(&inj_state->lock);
|
||||
for (int i = 0; i < INJ_MAX_WAIT; i++)
|
||||
{
|
||||
if (inj_state->name[i][0] == '\0')
|
||||
{
|
||||
index = i;
|
||||
strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
|
||||
old_wait_counts = inj_state->wait_counts[i];
|
||||
break;
|
||||
}
|
||||
}
|
||||
SpinLockRelease(&inj_state->lock);
|
||||
|
||||
if (index < 0)
|
||||
elog(ERROR, "could not find free slot for wait of injection point %s ",
|
||||
name);
|
||||
|
||||
/* And sleep.. */
|
||||
ConditionVariablePrepareToSleep(&inj_state->wait_point);
|
||||
for (;;)
|
||||
{
|
||||
uint32 new_wait_counts;
|
||||
|
||||
SpinLockAcquire(&inj_state->lock);
|
||||
new_wait_counts = inj_state->wait_counts[index];
|
||||
SpinLockRelease(&inj_state->lock);
|
||||
|
||||
if (old_wait_counts != new_wait_counts)
|
||||
break;
|
||||
ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
|
||||
}
|
||||
ConditionVariableCancelSleep();
|
||||
|
||||
/* Remove this injection point from the waiters. */
|
||||
SpinLockAcquire(&inj_state->lock);
|
||||
inj_state->name[index][0] = '\0';
|
||||
SpinLockRelease(&inj_state->lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* SQL function for creating an injection point.
|
||||
*/
|
||||
|
@ -58,6 +175,8 @@ injection_points_attach(PG_FUNCTION_ARGS)
|
|||
function = "injection_error";
|
||||
else if (strcmp(action, "notice") == 0)
|
||||
function = "injection_notice";
|
||||
else if (strcmp(action, "wait") == 0)
|
||||
function = "injection_wait";
|
||||
else
|
||||
elog(ERROR, "incorrect action \"%s\" for injection point creation", action);
|
||||
|
||||
|
@ -80,6 +199,42 @@ injection_points_run(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* SQL function for waking up an injection point waiting in injection_wait().
|
||||
*/
|
||||
PG_FUNCTION_INFO_V1(injection_points_wakeup);
|
||||
Datum
|
||||
injection_points_wakeup(PG_FUNCTION_ARGS)
|
||||
{
|
||||
char *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
|
||||
int index = -1;
|
||||
|
||||
if (inj_state == NULL)
|
||||
injection_init_shmem();
|
||||
|
||||
/* First bump the wait counter for the injection point to wake up */
|
||||
SpinLockAcquire(&inj_state->lock);
|
||||
for (int i = 0; i < INJ_MAX_WAIT; i++)
|
||||
{
|
||||
if (strcmp(name, inj_state->name[i]) == 0)
|
||||
{
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (index < 0)
|
||||
{
|
||||
SpinLockRelease(&inj_state->lock);
|
||||
elog(ERROR, "could not find injection point %s to wake up", name);
|
||||
}
|
||||
inj_state->wait_counts[index]++;
|
||||
SpinLockRelease(&inj_state->lock);
|
||||
|
||||
/* And broadcast the change to the waiters */
|
||||
ConditionVariableBroadcast(&inj_state->wait_point);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* SQL function for dropping an injection point.
|
||||
*/
|
||||
|
|
|
@ -1210,6 +1210,7 @@ InitializeDSMForeignScan_function
|
|||
InitializeWorkerForeignScan_function
|
||||
InjectionPointCacheEntry
|
||||
InjectionPointEntry
|
||||
InjectionPointSharedState
|
||||
InlineCodeBlock
|
||||
InsertStmt
|
||||
Instrumentation
|
||||
|
|
Loading…
Reference in New Issue