diff --git a/contrib/worker_spi/worker_spi.c b/contrib/worker_spi/worker_spi.c index 6da747b47b..344455cd57 100644 --- a/contrib/worker_spi/worker_spi.c +++ b/contrib/worker_spi/worker_spi.c @@ -1,16 +1,19 @@ /* ------------------------------------------------------------------------- * * worker_spi.c - * Sample background worker code that demonstrates usage of a database - * connection. + * Sample background worker code that demonstrates various coding + * patterns: establishing a database connection; starting and committing + * transactions; using GUC variables, and heeding SIGHUP to reread + * the configuration file; reporting to pg_stat_activity; using the + * process latch to sleep and exit in case of postmaster death. * - * This code connects to a database, create a schema and table, and summarizes + * This code connects to a database, creates a schema and table, and summarizes * the numbers contained therein. To see it working, insert an initial value * with "total" type and some initial value; then insert some other rows with * "delta" type. Delta rows will be deleted by this worker and their values * aggregated into the total. * - * Copyright (C) 2012, PostgreSQL Global Development Group + * Copyright (C) 2013, PostgreSQL Global Development Group * * IDENTIFICATION * contrib/worker_spi/worker_spi.c @@ -33,14 +36,22 @@ #include "executor/spi.h" #include "fmgr.h" #include "lib/stringinfo.h" +#include "pgstat.h" #include "utils/builtins.h" #include "utils/snapmgr.h" +#include "tcop/utility.h" PG_MODULE_MAGIC; void _PG_init(void); -static bool got_sigterm = false; +/* flags set by signal handlers */ +static volatile sig_atomic_t got_sighup = false; +static volatile sig_atomic_t got_sigterm = false; + +/* GUC variables */ +static int worker_spi_naptime = 10; +static int worker_spi_total_workers = 2; typedef struct worktable @@ -49,6 +60,11 @@ typedef struct worktable const char *name; } worktable; +/* + * Signal handler for SIGTERM + * Set a flag to let the main loop to terminate, and set our latch to wake + * it up. + */ static void worker_spi_sigterm(SIGNAL_ARGS) { @@ -61,14 +77,23 @@ worker_spi_sigterm(SIGNAL_ARGS) errno = save_errno; } +/* + * Signal handler for SIGHUP + * Set a flag to let the main loop to reread the config file, and set + * our latch to wake it up. + */ static void worker_spi_sighup(SIGNAL_ARGS) { - elog(LOG, "got sighup!"); + got_sighup = true; if (MyProc) SetLatch(&MyProc->procLatch); } +/* + * Initialize workspace for a worker process: create the schema if it doesn't + * already exist. + */ static void initialize_worker_spi(worktable *table) { @@ -77,10 +102,13 @@ initialize_worker_spi(worktable *table) bool isnull; StringInfoData buf; + SetCurrentStatementStartTimestamp(); StartTransactionCommand(); SPI_connect(); PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema"); + /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */ initStringInfo(&buf); appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'", table->schema); @@ -110,6 +138,9 @@ initialize_worker_spi(worktable *table) "WHERE type = 'total'", table->schema, table->name, table->name, table->name); + /* set statement start time */ + SetCurrentStatementStartTimestamp(); + ret = SPI_execute(buf.data, false, 0); if (ret != SPI_OK_UTILITY) @@ -119,6 +150,7 @@ initialize_worker_spi(worktable *table) SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, NULL); } static void @@ -163,6 +195,9 @@ worker_spi_main(void *main_arg) table->name, table->name); + /* + * Main loop: do this until the SIGTERM handler tells us to terminate + */ while (!got_sigterm) { int ret; @@ -176,17 +211,45 @@ worker_spi_main(void *main_arg) */ rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - 1000L); + worker_spi_naptime * 1000L); ResetLatch(&MyProc->procLatch); /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + /* + * In case of a SIGHUP, just reload the configuration. + */ + if (got_sighup) + { + got_sighup = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* + * Start a transaction on which we can run queries. Note that each + * StartTransactionCommand() call should be preceded by a + * SetCurrentStatementStartTimestamp() call, which sets both the time + * for the statement we're about the run, and also the transaction + * start time. Also, each other query sent to SPI should probably be + * preceded by SetCurrentStatementStartTimestamp(), so that statement + * start time is always up to date. + * + * The SPI_connect() call lets us run queries through the SPI manager, + * and the PushActiveSnapshot() call creates an "active" snapshot which + * is necessary for queries to have MVCC data to work on. + * + * The pgstat_report_activity() call makes our activity visible through + * the pgstat views. + */ + SetCurrentStatementStartTimestamp(); StartTransactionCommand(); SPI_connect(); PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, buf.data); + /* We can now execute queries via SPI */ ret = SPI_execute(buf.data, false, 0); if (ret != SPI_OK_UPDATE_RETURNING) @@ -207,9 +270,13 @@ worker_spi_main(void *main_arg) table->schema, table->name, val); } + /* + * And finish our transaction. + */ SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, NULL); } proc_exit(0); @@ -218,46 +285,66 @@ worker_spi_main(void *main_arg) /* * Entrypoint of this module. * - * We register two worker processes here, to demonstrate how that can be done. + * We register more than one worker process here, to demonstrate how that can + * be done. */ void _PG_init(void) { BackgroundWorker worker; worktable *table; + unsigned int i; + char name[20]; - /* register the worker processes. These values are common for both */ + /* get the configuration */ + DefineCustomIntVariable("worker_spi.naptime", + "Duration between each check (in seconds).", + NULL, + &worker_spi_naptime, + 10, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + DefineCustomIntVariable("worker_spi.total_workers", + "Number of workers.", + NULL, + &worker_spi_total_workers, + 2, + 1, + 100, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + /* set up common data for all our workers */ worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = BGW_NEVER_RESTART; worker.bgw_main = worker_spi_main; worker.bgw_sighup = worker_spi_sighup; worker.bgw_sigterm = worker_spi_sigterm; /* - * These values are used for the first worker. - * - * Note these are palloc'd. The reason this works after starting a new - * worker process is that if we only fork, they point to valid allocated - * memory in the child process; and if we fork and then exec, the exec'd - * process will run this code again, and so the memory is also valid there. + * Now fill in worker-specific data, and do the actual registrations. */ - table = palloc(sizeof(worktable)); - table->schema = pstrdup("schema1"); - table->name = pstrdup("counted"); + for (i = 1; i <= worker_spi_total_workers; i++) + { + sprintf(name, "worker %d", i); + worker.bgw_name = pstrdup(name); - worker.bgw_name = "SPI worker 1"; - worker.bgw_restart_time = BGW_NEVER_RESTART; - worker.bgw_main_arg = (void *) table; - RegisterBackgroundWorker(&worker); + table = palloc(sizeof(worktable)); + sprintf(name, "schema%d", i); + table->schema = pstrdup(name); + table->name = pstrdup("counted"); + worker.bgw_main_arg = (void *) table; - /* Values for the second worker */ - table = palloc(sizeof(worktable)); - table->schema = pstrdup("our schema2"); - table->name = pstrdup("counted rows"); - - worker.bgw_name = "SPI worker 2"; - worker.bgw_restart_time = 2; - worker.bgw_main_arg = (void *) table; - RegisterBackgroundWorker(&worker); + RegisterBackgroundWorker(&worker); + } }