diff --git a/contrib/Makefile b/contrib/Makefile index d230451a12..fcd7c1e033 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -50,7 +50,8 @@ SUBDIRS = \ test_parser \ tsearch2 \ unaccent \ - vacuumlo + vacuumlo \ + worker_spi ifeq ($(with_openssl),yes) SUBDIRS += sslinfo diff --git a/contrib/worker_spi/Makefile b/contrib/worker_spi/Makefile new file mode 100644 index 0000000000..edf4105a11 --- /dev/null +++ b/contrib/worker_spi/Makefile @@ -0,0 +1,14 @@ +# contrib/worker_spi/Makefile + +MODULES = worker_spi + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/worker_spi +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/worker_spi/worker_spi.c b/contrib/worker_spi/worker_spi.c new file mode 100644 index 0000000000..6da747b47b --- /dev/null +++ b/contrib/worker_spi/worker_spi.c @@ -0,0 +1,263 @@ +/* ------------------------------------------------------------------------- + * + * worker_spi.c + * Sample background worker code that demonstrates usage of a database + * connection. + * + * This code connects to a database, create a schema and table, and summarizes + * the numbers contained therein. To see it working, insert an initial value + * with "total" type and some initial value; then insert some other rows with + * "delta" type. Delta rows will be deleted by this worker and their values + * aggregated into the total. + * + * Copyright (C) 2012, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/worker_spi/worker_spi.c + * + * ------------------------------------------------------------------------- + */ +#include "postgres.h" + +/* These are always necessary for a bgworker */ +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/shmem.h" + +/* these headers are used by this particular worker's code */ +#include "access/xact.h" +#include "executor/spi.h" +#include "fmgr.h" +#include "lib/stringinfo.h" +#include "utils/builtins.h" +#include "utils/snapmgr.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); + +static bool got_sigterm = false; + + +typedef struct worktable +{ + const char *schema; + const char *name; +} worktable; + +static void +worker_spi_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigterm = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + + errno = save_errno; +} + +static void +worker_spi_sighup(SIGNAL_ARGS) +{ + elog(LOG, "got sighup!"); + if (MyProc) + SetLatch(&MyProc->procLatch); +} + +static void +initialize_worker_spi(worktable *table) +{ + int ret; + int ntup; + bool isnull; + StringInfoData buf; + + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + + initStringInfo(&buf); + appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'", + table->schema); + + ret = SPI_execute(buf.data, true, 0); + if (ret != SPI_OK_SELECT) + elog(FATAL, "SPI_execute failed: error code %d", ret); + + if (SPI_processed != 1) + elog(FATAL, "not a singleton result"); + + ntup = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); + if (isnull) + elog(FATAL, "null result"); + + if (ntup == 0) + { + resetStringInfo(&buf); + appendStringInfo(&buf, + "CREATE SCHEMA \"%s\" " + "CREATE TABLE \"%s\" (" + " type text CHECK (type IN ('total', 'delta')), " + " value integer)" + "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) " + "WHERE type = 'total'", + table->schema, table->name, table->name, table->name); + + ret = SPI_execute(buf.data, false, 0); + + if (ret != SPI_OK_UTILITY) + elog(FATAL, "failed to create my schema"); + } + + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); +} + +static void +worker_spi_main(void *main_arg) +{ + worktable *table = (worktable *) main_arg; + StringInfoData buf; + + /* We're now ready to receive signals */ + BackgroundWorkerUnblockSignals(); + + /* Connect to our database */ + BackgroundWorkerInitializeConnection("postgres", NULL); + + elog(LOG, "%s initialized with %s.%s", + MyBgworkerEntry->bgw_name, table->schema, table->name); + initialize_worker_spi(table); + + /* + * Quote identifiers passed to us. Note that this must be done after + * initialize_worker_spi, because that routine assumes the names are not + * quoted. + * + * Note some memory might be leaked here. + */ + table->schema = quote_identifier(table->schema); + table->name = quote_identifier(table->name); + + initStringInfo(&buf); + appendStringInfo(&buf, + "WITH deleted AS (DELETE " + "FROM %s.%s " + "WHERE type = 'delta' RETURNING value), " + "total AS (SELECT coalesce(sum(value), 0) as sum " + "FROM deleted) " + "UPDATE %s.%s " + "SET value = %s.value + total.sum " + "FROM total WHERE type = 'total' " + "RETURNING %s.value", + table->schema, table->name, + table->schema, table->name, + table->name, + table->name); + + while (!got_sigterm) + { + int ret; + int rc; + + /* + * Background workers mustn't call usleep() or any direct equivalent: + * instead, they may wait on their process latch, which sleeps as + * necessary, but is awakened if postmaster dies. That way the + * background process goes away immediately in an emergency. + */ + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 1000L); + ResetLatch(&MyProc->procLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + + ret = SPI_execute(buf.data, false, 0); + + if (ret != SPI_OK_UPDATE_RETURNING) + elog(FATAL, "cannot select from table %s.%s: error code %d", + table->schema, table->name, ret); + + if (SPI_processed > 0) + { + bool isnull; + int32 val; + + val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); + if (!isnull) + elog(LOG, "%s: count in %s.%s is now %d", + MyBgworkerEntry->bgw_name, + table->schema, table->name, val); + } + + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + } + + proc_exit(0); +} + +/* + * Entrypoint of this module. + * + * We register two worker processes here, to demonstrate how that can be done. + */ +void +_PG_init(void) +{ + BackgroundWorker worker; + worktable *table; + + /* register the worker processes. These values are common for both */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_main = worker_spi_main; + worker.bgw_sighup = worker_spi_sighup; + worker.bgw_sigterm = worker_spi_sigterm; + + /* + * These values are used for the first worker. + * + * Note these are palloc'd. The reason this works after starting a new + * worker process is that if we only fork, they point to valid allocated + * memory in the child process; and if we fork and then exec, the exec'd + * process will run this code again, and so the memory is also valid there. + */ + table = palloc(sizeof(worktable)); + table->schema = pstrdup("schema1"); + table->name = pstrdup("counted"); + + worker.bgw_name = "SPI worker 1"; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main_arg = (void *) table; + RegisterBackgroundWorker(&worker); + + /* Values for the second worker */ + table = palloc(sizeof(worktable)); + table->schema = pstrdup("our schema2"); + table->name = pstrdup("counted rows"); + + worker.bgw_name = "SPI worker 2"; + worker.bgw_restart_time = 2; + worker.bgw_main_arg = (void *) table; + RegisterBackgroundWorker(&worker); +} diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml new file mode 100644 index 0000000000..912c7deb8f --- /dev/null +++ b/doc/src/sgml/bgworker.sgml @@ -0,0 +1,146 @@ + + + + Background Worker Processes + + + Background workers + + + + PostgreSQL can be extended to run user-supplied code in separate processes. + Such processes are started, stopped and monitored by postgres, + which permits them to have a lifetime closely linked to the server's status. + These processes have the option to attach to PostgreSQL's + shared memory area and to connect to databases internally; they can also run + multiple transactions serially, just like a regular client-connected server + process. Also, by linking to libpq they can connect to the + server and behave like a regular client application. + + + + + There are considerable robustness and security risks in using background + worker processes because, being written in the C language, + they have unrestricted access to data. Administrators wishing to enable + modules that include background worker process should exercise extreme + caution. Only carefully audited modules should be permitted to run + background worker processes. + + + + + Only modules listed in shared_preload_libraries can run + background workers. A module wishing to run a background worker needs + to register it by calling + RegisterBackgroundWorker(BackgroundWorker *worker) + from its _PG_init(). + The structure BackgroundWorker is defined thus: + +typedef void (*bgworker_main_type)(void *main_arg); +typedef void (*bgworker_sighdlr_type)(SIGNAL_ARGS); +typedef struct BackgroundWorker +{ + char *bgw_name; + int bgw_flags; + BgWorkerStartTime bgw_start_time; + int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */ + bgworker_main_type bgw_main; + void *bgw_main_arg; + bgworker_sighdlr_type bgw_sighup; + bgworker_sighdlr_type bgw_sigterm; +} BackgroundWorker; + + + + + bgw_name is a string to be used in log messages, process + listings and similar contexts. + + + + bgw_flags is a bitwise-or'd bitmask indicating the + capabilities that the module wants. Possible values are + BGWORKER_SHMEM_ACCESS (requesting shared memory access) + and BGWORKER_BACKEND_DATABASE_CONNECTION (requesting the + ability to establish a database connection, through which it can later run + transactions and queries). + + + + bgw_start_time is the server state during which + postgres should start the process; it can be one of + BgWorkerStart_PostmasterStart (start as soon as + postgres itself has finished its own initialization; processes + requesting this are not eligible for database connections), + BgWorkerStart_ConsistentState (start as soon as a consistent state + has been reached in a HOT standby, allowing processes to connect to + databases and run read-only queries), and + BgWorkerStart_RecoveryFinished (start as soon as the system has + entered normal read-write state). Note the last two values are equivalent + in a server that's not a HOT standby. Note that this setting only indicates + when the processes are to be started; they do not stop when a different state + is reached. + + + + bgw_restart_time is the interval, in seconds, that + postgres should wait before restarting the process, in + case it crashes. It can be any positive value, + or BGW_NEVER_RESTART, indicating not to restart the + process in case of a crash. + + + + bgw_main is a pointer to the function to run when + the process is started. This function must take a single argument of type + void * and return void. + bgw_main_arg will be passed to it as its only + argument. Note that the global variable MyBgworkerEntry + points to a copy of the BackgroundWorker structure + passed at registration time. + + + + bgw_sighup and bgw_sigterm are + pointers to functions that will be installed as signal handlers for the new + process. If bgw_sighup is NULL, then SIG_IGN + is used; if bgw_sigterm is NULL, a handler is installed that + will terminate the process after logging a suitable message. + + + Once running, the process can connect to a database by calling + BackgroundWorkerInitializeConnection(char *dbname, char *username). + This allows the process to run transactions and queries using the + SPI interface. If dbname is NULL, + the session is not connected to any particular database, but shared catalogs + can be accessed. If username is NULL, the process will run as + the superuser created during initdb. + BackgroundWorkerInitializeConnection can only be called once per background + process, it is not possible to switch databases. + + + + Signals are initially blocked when control reaches the + bgw_main function, and must be unblocked by it; this is to + allow the process to further customize its signal handlers, if necessary. + Signals can be unblocked in the new process by calling + BackgroundWorkerUnblockSignals and blocked by calling + BackgroundWorkerBlockSignals. + + + + Background workers are expected to be continuously running; if they exit + cleanly, postgres will restart them immediately. Consider doing + interruptible sleep when they have nothing to do; this can be achieved by + calling WaitLatch(). Make sure the + WL_POSTMASTER_DEATH flag is set when calling that function, and + verify the return code for a prompt exit in the emergency case that + postgres itself has terminated. + + + + The worker_spi contrib module contains a working example, + which demonstrates some useful techniques. + + diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index db4cc3a3fb..368f9321c8 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -50,6 +50,7 @@ + diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml index 4ef1fc1a6e..15e4ef641e 100644 --- a/doc/src/sgml/postgres.sgml +++ b/doc/src/sgml/postgres.sgml @@ -218,6 +218,7 @@ &plpython; &spi; + &bgworker; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 6f93d93fa3..a492c60b46 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -103,6 +103,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" +#include "postmaster/bgworker.h" #include "postmaster/fork_process.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" @@ -125,6 +126,19 @@ #endif +/* + * Possible types of a backend. Beyond being the possible bkend_type values in + * struct bkend, these are OR-able request flag bits for SignalSomeChildren() + * and CountChildren(). + */ +#define BACKEND_TYPE_NORMAL 0x0001 /* normal backend */ +#define BACKEND_TYPE_AUTOVAC 0x0002 /* autovacuum worker process */ +#define BACKEND_TYPE_WALSND 0x0004 /* walsender process */ +#define BACKEND_TYPE_BGWORKER 0x0008 /* bgworker process */ +#define BACKEND_TYPE_ALL 0x000F /* OR of all the above */ + +#define BACKEND_TYPE_WORKER (BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER) + /* * List of active backends (or child processes anyway; we don't actually * know whether a given child has become a backend or is still in the @@ -132,19 +146,28 @@ * children we have and send them appropriate signals when necessary. * * "Special" children such as the startup, bgwriter and autovacuum launcher - * tasks are not in this list. Autovacuum worker and walsender processes are - * in it. Also, "dead_end" children are in it: these are children launched just - * for the purpose of sending a friendly rejection message to a would-be - * client. We must track them because they are attached to shared memory, - * but we know they will never become live backends. dead_end children are - * not assigned a PMChildSlot. + * tasks are not in this list. Autovacuum worker and walsender are in it. + * Also, "dead_end" children are in it: these are children launched just for + * the purpose of sending a friendly rejection message to a would-be client. + * We must track them because they are attached to shared memory, but we know + * they will never become live backends. dead_end children are not assigned a + * PMChildSlot. + * + * Background workers that request shared memory access during registration are + * in this list, too. */ typedef struct bkend { pid_t pid; /* process id of backend */ long cancel_key; /* cancel key for cancels for this backend */ int child_slot; /* PMChildSlot for this backend, if any */ - bool is_autovacuum; /* is it an autovacuum process? */ + + /* + * Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND + * backends initially announce themselves as BACKEND_TYPE_NORMAL, so if + * bkend_type is normal, you should check for a recent transition. + */ + int bkend_type; bool dead_end; /* is it going to send an error and quit? */ dlist_node elem; /* list link in BackendList */ } Backend; @@ -155,6 +178,33 @@ static dlist_head BackendList = DLIST_STATIC_INIT(BackendList); static Backend *ShmemBackendArray; #endif + +/* + * List of background workers. + * + * A worker that requests a database connection during registration will have + * rw_backend set, and will be present in BackendList. Note: do not rely on + * rw_backend being non-NULL for shmem-connected workers! + */ +typedef struct RegisteredBgWorker +{ + BackgroundWorker rw_worker; /* its registry entry */ + Backend *rw_backend; /* its BackendList entry, or NULL */ + pid_t rw_pid; /* 0 if not running */ + int rw_child_slot; + TimestampTz rw_crashed_at; /* if not 0, time it last crashed */ +#ifdef EXEC_BACKEND + int rw_cookie; +#endif + slist_node rw_lnode; /* list link */ +} RegisteredBgWorker; + +static slist_head BackgroundWorkerList = SLIST_STATIC_INIT(BackgroundWorkerList); + +BackgroundWorker *MyBgworkerEntry = NULL; + + + /* The socket number we are listening for connections on */ int PostPortNumber; /* The directory names for Unix socket(s) */ @@ -306,6 +356,10 @@ static volatile sig_atomic_t start_autovac_launcher = false; /* the launcher needs to be signalled to communicate some condition */ static volatile bool avlauncher_needs_signal = false; +/* set when there's a worker that needs to be started up */ +static volatile bool StartWorkerNeeded = true; +static volatile bool HaveCrashedWorker = false; + /* * State for assigning random salts and cancel keys. * Also, the global MyCancelKey passes the cancel key assigned to a given @@ -341,8 +395,11 @@ static void reaper(SIGNAL_ARGS); static void sigusr1_handler(SIGNAL_ARGS); static void startup_die(SIGNAL_ARGS); static void dummy_handler(SIGNAL_ARGS); +static int GetNumRegisteredBackgroundWorkers(int flags); static void StartupPacketTimeoutHandler(void); static void CleanupBackend(int pid, int exitstatus); +static bool CleanupBackgroundWorker(int pid, int exitstatus); +static void do_start_bgworker(void); static void HandleChildCrash(int pid, int exitstatus, const char *procname); static void LogChildExit(int lev, const char *procname, int pid, int exitstatus); @@ -361,19 +418,13 @@ static long PostmasterRandom(void); static void RandomSalt(char *md5Salt); static void signal_child(pid_t pid, int signal); static bool SignalSomeChildren(int signal, int targets); +static bool SignalUnconnectedWorkers(int signal); #define SignalChildren(sig) SignalSomeChildren(sig, BACKEND_TYPE_ALL) -/* - * Possible types of a backend. These are OR-able request flag bits - * for SignalSomeChildren() and CountChildren(). - */ -#define BACKEND_TYPE_NORMAL 0x0001 /* normal backend */ -#define BACKEND_TYPE_AUTOVAC 0x0002 /* autovacuum worker process */ -#define BACKEND_TYPE_WALSND 0x0004 /* walsender process */ -#define BACKEND_TYPE_ALL 0x0007 /* OR of all the above */ - static int CountChildren(int target); +static int CountUnconnectedWorkers(void); +static void StartOneBackgroundWorker(void); static bool CreateOptsFile(int argc, char *argv[], char *fullprogname); static pid_t StartChildProcess(AuxProcType type); static void StartAutovacuumWorker(void); @@ -473,6 +524,8 @@ static bool save_backend_variables(BackendParameters *param, Port *port, static void ShmemBackendArrayAdd(Backend *bn); static void ShmemBackendArrayRemove(Backend *bn); + +static BackgroundWorker *find_bgworker_entry(int cookie); #endif /* EXEC_BACKEND */ #define StartupDataBase() StartChildProcess(StartupProcess) @@ -843,6 +896,17 @@ PostmasterMain(int argc, char *argv[]) */ process_shared_preload_libraries(); + /* + * If loadable modules have added background workers, MaxBackends needs to + * be updated. Do so now by forcing a no-op update of max_connections. + * XXX This is a pretty ugly way to do it, but it doesn't seem worth + * introducing a new entry point in guc.c to do it in a cleaner fashion. + */ + if (GetNumShmemAttachedBgworkers() > 0) + SetConfigOption("max_connections", + GetConfigOption("max_connections", false, false), + PGC_POSTMASTER, PGC_S_OVERRIDE); + /* * Establish input sockets. */ @@ -1087,7 +1151,8 @@ PostmasterMain(int argc, char *argv[]) * handling setup of child processes. See tcop/postgres.c, * bootstrap/bootstrap.c, postmaster/bgwriter.c, postmaster/walwriter.c, * postmaster/autovacuum.c, postmaster/pgarch.c, postmaster/pgstat.c, - * postmaster/syslogger.c and postmaster/checkpointer.c. + * postmaster/syslogger.c, postmaster/bgworker.c and + * postmaster/checkpointer.c. */ pqinitmask(); PG_SETMASK(&BlockSig); @@ -1177,6 +1242,9 @@ PostmasterMain(int argc, char *argv[]) Assert(StartupPID != 0); pmState = PM_STARTUP; + /* Some workers may be scheduled to start now */ + StartOneBackgroundWorker(); + status = ServerLoop(); /* @@ -1341,6 +1409,90 @@ checkDataDir(void) FreeFile(fp); } +/* + * Determine how long should we let ServerLoop sleep. + * + * In normal conditions we wait at most one minute, to ensure that the other + * background tasks handled by ServerLoop get done even when no requests are + * arriving. However, if there are background workers waiting to be started, + * we don't actually sleep so that they are quickly serviced. + */ +static void +DetermineSleepTime(struct timeval *timeout) +{ + TimestampTz next_wakeup = 0; + + /* + * Normal case: either there are no background workers at all, or we're in + * a shutdown sequence (during which we ignore bgworkers altogether). + */ + if (Shutdown > NoShutdown || + (!StartWorkerNeeded && !HaveCrashedWorker)) + { + timeout->tv_sec = 60; + timeout->tv_usec = 0; + return; + } + + if (StartWorkerNeeded) + { + timeout->tv_sec = 0; + timeout->tv_usec = 0; + return; + } + + if (HaveCrashedWorker) + { + slist_iter siter; + + /* + * When there are crashed bgworkers, we sleep just long enough that + * they are restarted when they request to be. Scan the list to + * determine the minimum of all wakeup times according to most recent + * crash time and requested restart interval. + */ + slist_foreach(siter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + TimestampTz this_wakeup; + + rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur); + + if (rw->rw_crashed_at == 0) + continue; + + if (rw->rw_worker.bgw_restart_time == BGW_NEVER_RESTART) + continue; + + this_wakeup = TimestampTzPlusMilliseconds(rw->rw_crashed_at, + 1000L * rw->rw_worker.bgw_restart_time); + if (next_wakeup == 0 || this_wakeup < next_wakeup) + next_wakeup = this_wakeup; + } + } + + if (next_wakeup != 0) + { + int microsecs; + + TimestampDifference(GetCurrentTimestamp(), next_wakeup, + &timeout->tv_sec, µsecs); + timeout->tv_usec = microsecs; + + /* Ensure we don't exceed one minute */ + if (timeout->tv_sec > 60) + { + timeout->tv_sec = 60; + timeout->tv_usec = 0; + } + } + else + { + timeout->tv_sec = 60; + timeout->tv_usec = 0; + } +} + /* * Main idle loop of postmaster */ @@ -1364,9 +1516,6 @@ ServerLoop(void) /* * Wait for a connection request to arrive. * - * We wait at most one minute, to ensure that the other background - * tasks handled below get done even when no requests are arriving. - * * If we are in PM_WAIT_DEAD_END state, then we don't want to accept * any new connections, so we don't call select() at all; just sleep * for a little bit with signals unblocked. @@ -1385,8 +1534,7 @@ ServerLoop(void) /* must set timeout each time; some OSes change it! */ struct timeval timeout; - timeout.tv_sec = 60; - timeout.tv_usec = 0; + DetermineSleepTime(&timeout); selres = select(nSockets, &rmask, NULL, NULL, &timeout); } @@ -1498,6 +1646,10 @@ ServerLoop(void) kill(AutoVacPID, SIGUSR2); } + /* Get other worker processes running, if needed */ + if (StartWorkerNeeded || HaveCrashedWorker) + StartOneBackgroundWorker(); + /* * Touch Unix socket and lock files every 58 minutes, to ensure that * they are not removed by overzealous /tmp-cleaning tasks. We assume @@ -1513,7 +1665,6 @@ ServerLoop(void) } } - /* * Initialise the masks for select() for the ports we are listening on. * Return the number of sockets to listen on. @@ -1867,7 +2018,7 @@ processCancelRequest(Port *port, void *pkt) Backend *bp; #ifndef EXEC_BACKEND - dlist_iter iter; + dlist_iter iter; #else int i; #endif @@ -2205,8 +2356,11 @@ pmdie(SIGNAL_ARGS) if (pmState == PM_RUN || pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY || pmState == PM_STARTUP) { - /* autovacuum workers are told to shut down immediately */ - SignalSomeChildren(SIGTERM, BACKEND_TYPE_AUTOVAC); + /* autovac workers are told to shut down immediately */ + /* and bgworkers too; does this need tweaking? */ + SignalSomeChildren(SIGTERM, + BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER); + SignalUnconnectedWorkers(SIGTERM); /* and the autovac launcher too */ if (AutoVacPID != 0) signal_child(AutoVacPID, SIGTERM); @@ -2258,12 +2412,14 @@ pmdie(SIGNAL_ARGS) signal_child(BgWriterPID, SIGTERM); if (WalReceiverPID != 0) signal_child(WalReceiverPID, SIGTERM); + SignalUnconnectedWorkers(SIGTERM); if (pmState == PM_RECOVERY) { /* - * Only startup, bgwriter, walreceiver, and/or checkpointer - * should be active in this state; we just signaled the first - * three, and we don't want to kill checkpointer yet. + * Only startup, bgwriter, walreceiver, unconnected bgworkers, + * and/or checkpointer should be active in this state; we just + * signaled the first four, and we don't want to kill + * checkpointer yet. */ pmState = PM_WAIT_BACKENDS; } @@ -2275,9 +2431,10 @@ pmdie(SIGNAL_ARGS) { ereport(LOG, (errmsg("aborting any active transactions"))); - /* shut down all backends and autovac workers */ + /* shut down all backends and workers */ SignalSomeChildren(SIGTERM, - BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC); + BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC | + BACKEND_TYPE_BGWORKER); /* and the autovac launcher too */ if (AutoVacPID != 0) signal_child(AutoVacPID, SIGTERM); @@ -2321,6 +2478,7 @@ pmdie(SIGNAL_ARGS) signal_child(PgArchPID, SIGQUIT); if (PgStatPID != 0) signal_child(PgStatPID, SIGQUIT); + SignalUnconnectedWorkers(SIGQUIT); ExitPostmaster(0); break; } @@ -2449,6 +2607,9 @@ reaper(SIGNAL_ARGS) if (PgStatPID == 0) PgStatPID = pgstat_start(); + /* some workers may be scheduled to start now */ + StartOneBackgroundWorker(); + /* at this point we are really open for business */ ereport(LOG, (errmsg("database system is ready to accept connections"))); @@ -2615,6 +2776,14 @@ reaper(SIGNAL_ARGS) continue; } + /* Was it one of our background workers? */ + if (CleanupBackgroundWorker(pid, exitstatus)) + { + /* have it be restarted */ + HaveCrashedWorker = true; + continue; + } + /* * Else do standard backend child cleanup. */ @@ -2633,11 +2802,100 @@ reaper(SIGNAL_ARGS) errno = save_errno; } +/* + * Scan the bgworkers list and see if the given PID (which has just stopped + * or crashed) is in it. Handle its shutdown if so, and return true. If not a + * bgworker, return false. + * + * This is heavily based on CleanupBackend. One important difference is that + * we don't know yet that the dying process is a bgworker, so we must be silent + * until we're sure it is. + */ +static bool +CleanupBackgroundWorker(int pid, + int exitstatus) /* child's exit status */ +{ + char namebuf[MAXPGPATH]; + slist_iter iter; + + slist_foreach(iter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); + + if (rw->rw_pid != pid) + continue; + +#ifdef WIN32 + /* see CleanupBackend */ + if (exitstatus == ERROR_WAIT_NO_CHILDREN) + exitstatus = 0; +#endif + + snprintf(namebuf, MAXPGPATH, "%s: %s", _("worker process"), + rw->rw_worker.bgw_name); + + /* Delay restarting any bgworker that exits with a nonzero status. */ + if (!EXIT_STATUS_0(exitstatus)) + rw->rw_crashed_at = GetCurrentTimestamp(); + else + rw->rw_crashed_at = 0; + + /* + * Additionally, for shared-memory-connected workers, just like a + * backend, any exit status other than 0 or 1 is considered a crash + * and causes a system-wide restart. + */ + if (rw->rw_worker.bgw_flags & BGWORKER_SHMEM_ACCESS) + { + if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) + { + rw->rw_crashed_at = GetCurrentTimestamp(); + HandleChildCrash(pid, exitstatus, namebuf); + return true; + } + } + + if (!ReleasePostmasterChildSlot(rw->rw_child_slot)) + { + /* + * Uh-oh, the child failed to clean itself up. Treat as a crash + * after all. + */ + rw->rw_crashed_at = GetCurrentTimestamp(); + HandleChildCrash(pid, exitstatus, namebuf); + return true; + } + + /* Get it out of the BackendList and clear out remaining data */ + if (rw->rw_backend) + { + Assert(rw->rw_worker.bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION); + dlist_delete(&rw->rw_backend->elem); +#ifdef EXEC_BACKEND + ShmemBackendArrayRemove(rw->rw_backend); +#endif + free(rw->rw_backend); + rw->rw_backend = NULL; + } + rw->rw_pid = 0; + rw->rw_child_slot = 0; + + LogChildExit(LOG, namebuf, pid, exitstatus); + + return true; + } + + return false; +} /* * CleanupBackend -- cleanup after terminated backend. * * Remove all local state associated with backend. + * + * If you change this, see also CleanupBackgroundWorker. */ static void CleanupBackend(int pid, @@ -2705,7 +2963,7 @@ CleanupBackend(int pid, /* * HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer, - * walwriter or autovacuum. + * walwriter, autovacuum, or background worker. * * The objectives here are to clean up our local state about the child * process, and to signal all other remaining children to quickdie. @@ -2714,6 +2972,7 @@ static void HandleChildCrash(int pid, int exitstatus, const char *procname) { dlist_mutable_iter iter; + slist_iter siter; Backend *bp; /* @@ -2727,6 +2986,56 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) (errmsg("terminating any other active server processes"))); } + /* Process background workers. */ + slist_foreach(siter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur); + if (rw->rw_pid == 0) + continue; /* not running */ + if (rw->rw_pid == pid) + { + /* + * Found entry for freshly-dead worker, so remove it. + */ + (void) ReleasePostmasterChildSlot(rw->rw_child_slot); + if (rw->rw_backend) + { + dlist_delete(&rw->rw_backend->elem); +#ifdef EXEC_BACKEND + ShmemBackendArrayRemove(rw->rw_backend); +#endif + free(rw->rw_backend); + rw->rw_backend = NULL; + } + rw->rw_pid = 0; + rw->rw_child_slot = 0; + /* don't reset crashed_at */ + /* Keep looping so we can signal remaining workers */ + } + else + { + /* + * This worker is still alive. Unless we did so already, tell it + * to commit hara-kiri. + * + * SIGQUIT is the special signal that says exit without proc_exit + * and let the user know what's going on. But if SendStop is set + * (-s on command line), then we send SIGSTOP instead, so that we + * can get core dumps from all backends by hand. + */ + if (!FatalError) + { + ereport(DEBUG2, + (errmsg_internal("sending %s to process %d", + (SendStop ? "SIGSTOP" : "SIGQUIT"), + (int) rw->rw_pid))); + signal_child(rw->rw_pid, (SendStop ? SIGSTOP : SIGQUIT)); + } + } + } + /* Process regular backends */ dlist_foreach_modify(iter, &BackendList) { @@ -2761,7 +3070,13 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) * * We could exclude dead_end children here, but at least in the * SIGSTOP case it seems better to include them. + * + * Background workers were already processed above; ignore them + * here. */ + if (bp->bkend_type == BACKEND_TYPE_BGWORKER) + continue; + if (!FatalError) { ereport(DEBUG2, @@ -3005,16 +3320,17 @@ PostmasterStateMachine(void) { /* * PM_WAIT_BACKENDS state ends when we have no regular backends - * (including autovac workers) and no walwriter, autovac launcher or - * bgwriter. If we are doing crash recovery then we expect the - * checkpointer to exit as well, otherwise not. The archiver, stats, - * and syslogger processes are disregarded since they are not - * connected to shared memory; we also disregard dead_end children - * here. Walsenders are also disregarded, they will be terminated - * later after writing the checkpoint record, like the archiver - * process. + * (including autovac workers), no bgworkers (including unconnected + * ones), and no walwriter, autovac launcher or bgwriter. If we are + * doing crash recovery then we expect the checkpointer to exit as + * well, otherwise not. The archiver, stats, and syslogger processes + * are disregarded since they are not connected to shared memory; we + * also disregard dead_end children here. Walsenders are also + * disregarded, they will be terminated later after writing the + * checkpoint record, like the archiver process. */ - if (CountChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC) == 0 && + if (CountChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_WORKER) == 0 && + CountUnconnectedWorkers() == 0 && StartupPID == 0 && WalReceiverPID == 0 && BgWriterPID == 0 && @@ -3226,6 +3542,39 @@ signal_child(pid_t pid, int signal) #endif } +/* + * Send a signal to bgworkers that did not request backend connections + * + * The reason this is interesting is that workers that did request connections + * are considered by SignalChildren; this function complements that one. + */ +static bool +SignalUnconnectedWorkers(int signal) +{ + slist_iter iter; + bool signaled = false; + + slist_foreach(iter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); + + if (rw->rw_pid == 0) + continue; + /* ignore connected workers */ + if (rw->rw_backend != NULL) + continue; + + ereport(DEBUG4, + (errmsg_internal("sending signal %d to process %d", + signal, (int) rw->rw_pid))); + signal_child(rw->rw_pid, signal); + signaled = true; + } + return signaled; +} + /* * Send a signal to the targeted children (but NOT special children; * dead_end children are never signaled, either). @@ -3233,7 +3582,7 @@ signal_child(pid_t pid, int signal) static bool SignalSomeChildren(int signal, int target) { - dlist_iter iter; + dlist_iter iter; bool signaled = false; dlist_foreach(iter, &BackendList) @@ -3249,15 +3598,15 @@ SignalSomeChildren(int signal, int target) */ if (target != BACKEND_TYPE_ALL) { - int child; + /* + * Assign bkend_type for any recently announced WAL Sender + * processes. + */ + if (bp->bkend_type == BACKEND_TYPE_NORMAL && + IsPostmasterChildWalSender(bp->child_slot)) + bp->bkend_type = BACKEND_TYPE_WALSND; - if (bp->is_autovacuum) - child = BACKEND_TYPE_AUTOVAC; - else if (IsPostmasterChildWalSender(bp->child_slot)) - child = BACKEND_TYPE_WALSND; - else - child = BACKEND_TYPE_NORMAL; - if (!(target & child)) + if (!(target & bp->bkend_type)) continue; } @@ -3375,7 +3724,7 @@ BackendStartup(Port *port) * of backends. */ bn->pid = pid; - bn->is_autovacuum = false; + bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */ dlist_push_head(&BackendList, &bn->elem); #ifdef EXEC_BACKEND @@ -3744,7 +4093,10 @@ internal_forkexec(int argc, char *argv[], Port *port) fp = AllocateFile(tmpfilename, PG_BINARY_W); if (!fp) { - /* As in OpenTemporaryFile, try to make the temp-file directory */ + /* + * As in OpenTemporaryFileInTablespace, try to make the temp-file + * directory + */ mkdir(PG_TEMP_FILES_DIR, S_IRWXU); fp = AllocateFile(tmpfilename, PG_BINARY_W); @@ -4078,7 +4430,8 @@ SubPostmasterMain(int argc, char *argv[]) if (strcmp(argv[1], "--forkbackend") == 0 || strcmp(argv[1], "--forkavlauncher") == 0 || strcmp(argv[1], "--forkavworker") == 0 || - strcmp(argv[1], "--forkboot") == 0) + strcmp(argv[1], "--forkboot") == 0 || + strncmp(argv[1], "--forkbgworker=", 15) == 0) PGSharedMemoryReAttach(); /* autovacuum needs this set before calling InitProcess */ @@ -4213,6 +4566,26 @@ SubPostmasterMain(int argc, char *argv[]) AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */ } + if (strncmp(argv[1], "--forkbgworker=", 15) == 0) + { + int cookie; + + /* Close the postmaster's sockets */ + ClosePostmasterPorts(false); + + /* Restore basic shared memory pointers */ + InitShmemAccess(UsedShmemSegAddr); + + /* Need a PGPROC to run CreateSharedMemoryAndSemaphores */ + InitProcess(); + + /* Attach process to shared data structures */ + CreateSharedMemoryAndSemaphores(false, 0); + + cookie = atoi(argv[1] + 15); + MyBgworkerEntry = find_bgworker_entry(cookie); + do_start_bgworker(); + } if (strcmp(argv[1], "--forkarch") == 0) { /* Close the postmaster's sockets */ @@ -4312,6 +4685,9 @@ sigusr1_handler(SIGNAL_ARGS) (errmsg("database system is ready to accept read only connections"))); pmState = PM_HOT_STANDBY; + + /* Some workers may be scheduled to start now */ + StartOneBackgroundWorker(); } if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) && @@ -4481,6 +4857,33 @@ PostmasterRandom(void) return random(); } +/* + * Count up number of worker processes that did not request backend connections + * See SignalUnconnectedWorkers for why this is interesting. + */ +static int +CountUnconnectedWorkers(void) +{ + slist_iter iter; + int cnt = 0; + + slist_foreach(iter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); + + if (rw->rw_pid == 0) + continue; + /* ignore connected workers */ + if (rw->rw_backend != NULL) + continue; + + cnt++; + } + return cnt; +} + /* * Count up number of child processes of specified types (dead_end chidren * are always excluded). @@ -4488,7 +4891,7 @@ PostmasterRandom(void) static int CountChildren(int target) { - dlist_iter iter; + dlist_iter iter; int cnt = 0; dlist_foreach(iter, &BackendList) @@ -4504,15 +4907,15 @@ CountChildren(int target) */ if (target != BACKEND_TYPE_ALL) { - int child; + /* + * Assign bkend_type for any recently announced WAL Sender + * processes. + */ + if (bp->bkend_type == BACKEND_TYPE_NORMAL && + IsPostmasterChildWalSender(bp->child_slot)) + bp->bkend_type = BACKEND_TYPE_WALSND; - if (bp->is_autovacuum) - child = BACKEND_TYPE_AUTOVAC; - else if (IsPostmasterChildWalSender(bp->child_slot)) - child = BACKEND_TYPE_WALSND; - else - child = BACKEND_TYPE_NORMAL; - if (!(target & child)) + if (!(target & bp->bkend_type)) continue; } @@ -4671,7 +5074,7 @@ StartAutovacuumWorker(void) bn->pid = StartAutoVacWorker(); if (bn->pid > 0) { - bn->is_autovacuum = true; + bn->bkend_type = BACKEND_TYPE_AUTOVAC; dlist_push_head(&BackendList, &bn->elem); #ifdef EXEC_BACKEND ShmemBackendArrayAdd(bn); @@ -4746,18 +5149,642 @@ CreateOptsFile(int argc, char *argv[], char *fullprogname) * * This reports the number of entries needed in per-child-process arrays * (the PMChildFlags array, and if EXEC_BACKEND the ShmemBackendArray). - * These arrays include regular backends, autovac workers and walsenders, - * but not special children nor dead_end children. This allows the arrays - * to have a fixed maximum size, to wit the same too-many-children limit - * enforced by canAcceptConnections(). The exact value isn't too critical - * as long as it's more than MaxBackends. + * These arrays include regular backends, autovac workers, walsenders + * and background workers, but not special children nor dead_end children. + * This allows the arrays to have a fixed maximum size, to wit the same + * too-many-children limit enforced by canAcceptConnections(). The exact value + * isn't too critical as long as it's more than MaxBackends. */ int MaxLivePostmasterChildren(void) { - return 2 * MaxBackends; + return 2 * (MaxConnections + autovacuum_max_workers + 1 + + GetNumRegisteredBackgroundWorkers(0)); } +/* + * Register a new background worker. + * + * This can only be called in the _PG_init function of a module library + * that's loaded by shared_preload_libraries; otherwise it has no effect. + */ +void +RegisterBackgroundWorker(BackgroundWorker *worker) +{ + RegisteredBgWorker *rw; + int namelen = strlen(worker->bgw_name); + +#ifdef EXEC_BACKEND + + /* + * Use 1 here, not 0, to avoid confusing a possible bogus cookie read by + * atoi() in SubPostmasterMain. + */ + static int BackgroundWorkerCookie = 1; +#endif + + if (!IsUnderPostmaster) + ereport(LOG, + (errmsg("registering background worker: %s", worker->bgw_name))); + + if (!process_shared_preload_libraries_in_progress) + { + if (!IsUnderPostmaster) + ereport(LOG, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("background worker \"%s\": must be registered in shared_preload_libraries", + worker->bgw_name))); + return; + } + + /* sanity check for flags */ + if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) + { + if (!(worker->bgw_flags & BGWORKER_SHMEM_ACCESS)) + { + if (!IsUnderPostmaster) + ereport(LOG, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": must attach to shared memory in order to request a database connection", + worker->bgw_name))); + return; + } + + if (worker->bgw_start_time == BgWorkerStart_PostmasterStart) + { + if (!IsUnderPostmaster) + ereport(LOG, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": cannot request database access if starting at postmaster start", + worker->bgw_name))); + return; + } + + /* XXX other checks? */ + } + + if ((worker->bgw_restart_time < 0 && + worker->bgw_restart_time != BGW_NEVER_RESTART) || + (worker->bgw_restart_time > USECS_PER_DAY / 1000)) + { + if (!IsUnderPostmaster) + ereport(LOG, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("background worker \"%s\": invalid restart interval", + worker->bgw_name))); + return; + } + + /* + * Copy the registration data into the registered workers list. + */ + rw = malloc(sizeof(RegisteredBgWorker) + namelen + 1); + if (rw == NULL) + { + ereport(LOG, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + return; + } + + rw->rw_worker = *worker; + rw->rw_worker.bgw_name = ((char *) rw) + sizeof(RegisteredBgWorker); + strlcpy(rw->rw_worker.bgw_name, worker->bgw_name, namelen + 1); + + rw->rw_backend = NULL; + rw->rw_pid = 0; + rw->rw_child_slot = 0; + rw->rw_crashed_at = 0; +#ifdef EXEC_BACKEND + rw->rw_cookie = BackgroundWorkerCookie++; +#endif + + slist_push_head(&BackgroundWorkerList, &rw->rw_lnode); +} + +/* + * Connect background worker to a database. + */ +void +BackgroundWorkerInitializeConnection(char *dbname, char *username) +{ + BackgroundWorker *worker = MyBgworkerEntry; + + /* XXX is this the right errcode? */ + if (!(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION)) + ereport(FATAL, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("database connection requirement not indicated during registration"))); + + InitPostgres(dbname, InvalidOid, username, NULL); + + /* it had better not gotten out of "init" mode yet */ + if (!IsInitProcessingMode()) + ereport(ERROR, + (errmsg("invalid processing mode in bgworker"))); + SetProcessingMode(NormalProcessing); +} + +/* + * Block/unblock signals in a background worker + */ +void +BackgroundWorkerBlockSignals(void) +{ + PG_SETMASK(&BlockSig); +} + +void +BackgroundWorkerUnblockSignals(void) +{ + PG_SETMASK(&UnBlockSig); +} + +#ifdef EXEC_BACKEND +static BackgroundWorker * +find_bgworker_entry(int cookie) +{ + slist_iter iter; + + slist_foreach(iter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); + if (rw->rw_cookie == cookie) + return &rw->rw_worker; + } + + return NULL; +} +#endif + +static void +bgworker_quickdie(SIGNAL_ARGS) +{ + sigaddset(&BlockSig, SIGQUIT); /* prevent nested calls */ + PG_SETMASK(&BlockSig); + + /* + * We DO NOT want to run proc_exit() callbacks -- we're here because + * shared memory may be corrupted, so we don't want to try to clean up our + * transaction. Just nail the windows shut and get out of town. Now that + * there's an atexit callback to prevent third-party code from breaking + * things by calling exit() directly, we have to reset the callbacks + * explicitly to make this work as intended. + */ + on_exit_reset(); + + /* + * Note we do exit(0) here, not exit(2) like quickdie. The reason is that + * we don't want to be seen this worker as independently crashed, because + * then postmaster would delay restarting it again afterwards. If some + * idiot DBA manually sends SIGQUIT to a random bgworker, the "dead man + * switch" will ensure that postmaster sees this as a crash. + */ + exit(0); +} + +/* + * Standard SIGTERM handler for background workers + */ +static void +bgworker_die(SIGNAL_ARGS) +{ + PG_SETMASK(&BlockSig); + + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating background worker \"%s\" due to administrator command", + MyBgworkerEntry->bgw_name))); +} + +static void +do_start_bgworker(void) +{ + sigjmp_buf local_sigjmp_buf; + char buf[MAXPGPATH]; + BackgroundWorker *worker = MyBgworkerEntry; + + if (worker == NULL) + elog(FATAL, "unable to find bgworker entry"); + + /* we are a postmaster subprocess now */ + IsUnderPostmaster = true; + IsBackgroundWorker = true; + + /* reset MyProcPid */ + MyProcPid = getpid(); + + /* record Start Time for logging */ + MyStartTime = time(NULL); + + /* Identify myself via ps */ + snprintf(buf, MAXPGPATH, "bgworker: %s", worker->bgw_name); + init_ps_display(buf, "", "", ""); + + SetProcessingMode(InitProcessing); + + /* Apply PostAuthDelay */ + if (PostAuthDelay > 0) + pg_usleep(PostAuthDelay * 1000000L); + + /* + * If possible, make this process a group leader, so that the postmaster + * can signal any child processes too. + */ +#ifdef HAVE_SETSID + if (setsid() < 0) + elog(FATAL, "setsid() failed: %m"); +#endif + + /* + * Set up signal handlers. + */ + if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) + { + /* + * SIGINT is used to signal canceling the current action + */ + pqsignal(SIGINT, StatementCancelHandler); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGFPE, FloatExceptionHandler); + + /* XXX Any other handlers needed here? */ + } + else + { + pqsignal(SIGINT, SIG_IGN); + pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGFPE, SIG_IGN); + } + + /* SIGTERM and SIGHUP are configurable */ + if (worker->bgw_sigterm) + pqsignal(SIGTERM, worker->bgw_sigterm); + else + pqsignal(SIGTERM, bgworker_die); + + if (worker->bgw_sighup) + pqsignal(SIGHUP, worker->bgw_sighup); + else + pqsignal(SIGHUP, SIG_IGN); + + pqsignal(SIGQUIT, bgworker_quickdie); + InitializeTimeouts(); /* establishes SIGALRM handler */ + + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGCHLD, SIG_DFL); + + /* + * If an exception is encountered, processing resumes here. + * + * See notes in postgres.c about the design of this coding. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* Since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevent interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* + * Do we need more cleanup here? For shmem-connected bgworkers, we + * will call InitProcess below, which will install ProcKill as exit + * callback. That will take care of releasing locks, etc. + */ + + /* and go away */ + proc_exit(1); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* Early initialization */ + BaseInit(); + + /* + * If necessary, create a per-backend PGPROC struct in shared memory, + * except in the EXEC_BACKEND case where this was done in + * SubPostmasterMain. We must do this before we can use LWLocks (and in + * the EXEC_BACKEND case we already had to do some stuff with LWLocks). + */ +#ifndef EXEC_BACKEND + if (worker->bgw_flags & BGWORKER_SHMEM_ACCESS) + InitProcess(); +#endif + + /* + * Note that in normal processes, we would call InitPostgres here. For a + * worker, however, we don't know what database to connect to, yet; so we + * need to wait until the user code does it via + * BackgroundWorkerInitializeConnection(). + */ + + /* + * Now invoke the user-defined worker code + */ + worker->bgw_main(worker->bgw_main_arg); + + /* ... and if it returns, we're done */ + proc_exit(0); +} + +/* + * Return the number of background workers registered that have at least + * one of the passed flag bits set. + */ +static int +GetNumRegisteredBackgroundWorkers(int flags) +{ + slist_iter iter; + int count = 0; + + slist_foreach(iter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); + + if (flags != 0 && + !(rw->rw_worker.bgw_flags & flags)) + continue; + + count++; + } + + return count; +} + +/* + * Return the number of bgworkers that need to have PGPROC entries. + */ +int +GetNumShmemAttachedBgworkers(void) +{ + return GetNumRegisteredBackgroundWorkers(BGWORKER_SHMEM_ACCESS); +} + +#ifdef EXEC_BACKEND +static pid_t +bgworker_forkexec(int cookie) +{ + char *av[10]; + int ac = 0; + char forkav[MAXPGPATH]; + + snprintf(forkav, MAXPGPATH, "--forkbgworker=%d", cookie); + + av[ac++] = "postgres"; + av[ac++] = forkav; + av[ac++] = NULL; /* filled in by postmaster_forkexec */ + av[ac] = NULL; + + Assert(ac < lengthof(av)); + + return postmaster_forkexec(ac, av); +} +#endif + +/* + * Start a new bgworker. + * Starting time conditions must have been checked already. + * + * This code is heavily based on autovacuum.c, q.v. + */ +static void +start_bgworker(RegisteredBgWorker *rw) +{ + pid_t worker_pid; + + ereport(LOG, + (errmsg("starting background worker process \"%s\"", + rw->rw_worker.bgw_name))); + +#ifdef EXEC_BACKEND + switch ((worker_pid = bgworker_forkexec(rw->rw_cookie))) +#else + switch ((worker_pid = fork_process())) +#endif + { + case -1: + ereport(LOG, + (errmsg("could not fork worker process: %m"))); + return; + +#ifndef EXEC_BACKEND + case 0: + /* in postmaster child ... */ + /* Close the postmaster's sockets */ + ClosePostmasterPorts(false); + + /* Lose the postmaster's on-exit routines */ + on_exit_reset(); + + /* Do NOT release postmaster's working memory context */ + + MyBgworkerEntry = &rw->rw_worker; + do_start_bgworker(); + break; +#endif + default: + rw->rw_pid = worker_pid; + if (rw->rw_backend) + rw->rw_backend->pid = rw->rw_pid; + } +} + +/* + * Does the current postmaster state require starting a worker with the + * specified start_time? + */ +static bool +bgworker_should_start_now(BgWorkerStartTime start_time) +{ + switch (pmState) + { + case PM_NO_CHILDREN: + case PM_WAIT_DEAD_END: + case PM_SHUTDOWN_2: + case PM_SHUTDOWN: + case PM_WAIT_BACKENDS: + case PM_WAIT_READONLY: + case PM_WAIT_BACKUP: + break; + + case PM_RUN: + if (start_time == BgWorkerStart_RecoveryFinished) + return true; + /* fall through */ + + case PM_HOT_STANDBY: + if (start_time == BgWorkerStart_ConsistentState) + return true; + /* fall through */ + + case PM_RECOVERY: + case PM_STARTUP: + case PM_INIT: + if (start_time == BgWorkerStart_PostmasterStart) + return true; + /* fall through */ + + } + + return false; +} + +/* + * Allocate the Backend struct for a connected background worker, but don't + * add it to the list of backends just yet. + * + * Some info from the Backend is copied into the passed rw. + */ +static bool +assign_backendlist_entry(RegisteredBgWorker *rw) +{ + Backend *bn = malloc(sizeof(Backend)); + + if (bn == NULL) + { + ereport(LOG, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + /* + * The worker didn't really crash, but setting this nonzero makes + * postmaster wait a bit before attempting to start it again; if it + * tried again right away, most likely it'd find itself under the same + * memory pressure. + */ + rw->rw_crashed_at = GetCurrentTimestamp(); + return false; + } + + /* + * Compute the cancel key that will be assigned to this session. We + * probably don't need cancel keys for background workers, but we'd better + * have something random in the field to prevent unfriendly people from + * sending cancels to them. + */ + MyCancelKey = PostmasterRandom(); + bn->cancel_key = MyCancelKey; + + bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot(); + bn->bkend_type = BACKEND_TYPE_BGWORKER; + bn->dead_end = false; + + rw->rw_backend = bn; + rw->rw_child_slot = bn->child_slot; + + return true; +} + +/* + * If the time is right, start one background worker. + * + * As a side effect, the bgworker control variables are set or reset whenever + * there are more workers to start after this one, and whenever the overall + * system state requires it. + */ +static void +StartOneBackgroundWorker(void) +{ + slist_iter iter; + TimestampTz now = 0; + + if (FatalError) + { + StartWorkerNeeded = false; + HaveCrashedWorker = false; + return; /* not yet */ + } + + HaveCrashedWorker = false; + + slist_foreach(iter, &BackgroundWorkerList) + { + RegisteredBgWorker *rw; + + rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur); + + /* already running? */ + if (rw->rw_pid != 0) + continue; + + /* + * If this worker has crashed previously, maybe it needs to be + * restarted (unless on registration it specified it doesn't want to + * be restarted at all). Check how long ago did a crash last happen. + * If the last crash is too recent, don't start it right away; let it + * be restarted once enough time has passed. + */ + if (rw->rw_crashed_at != 0) + { + if (rw->rw_worker.bgw_restart_time == BGW_NEVER_RESTART) + continue; + + if (now == 0) + now = GetCurrentTimestamp(); + + if (!TimestampDifferenceExceeds(rw->rw_crashed_at, now, + rw->rw_worker.bgw_restart_time * 1000)) + { + HaveCrashedWorker = true; + continue; + } + } + + if (bgworker_should_start_now(rw->rw_worker.bgw_start_time)) + { + /* reset crash time before calling assign_backendlist_entry */ + rw->rw_crashed_at = 0; + + /* + * If necessary, allocate and assign the Backend element. Note we + * must do this before forking, so that we can handle out of + * memory properly. + * + * If not connected, we don't need a Backend element, but we still + * need a PMChildSlot. + */ + if (rw->rw_worker.bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION) + { + if (!assign_backendlist_entry(rw)) + return; + } + else + rw->rw_child_slot = MyPMChildSlot = AssignPostmasterChildSlot(); + + start_bgworker(rw); /* sets rw->rw_pid */ + + if (rw->rw_backend) + { + dlist_push_head(&BackendList, &rw->rw_backend->elem); +#ifdef EXEC_BACKEND + ShmemBackendArrayAdd(rw->rw_backend); +#endif + } + + /* + * Have ServerLoop call us again. Note that there might not + * actually *be* another runnable worker, but we don't care all + * that much; we will find out the next time we run. + */ + StartWorkerNeeded = true; + return; + } + } + + /* no runnable worker found */ + StartWorkerNeeded = false; +} #ifdef EXEC_BACKEND diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 41af7924c0..a56cb825fc 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -140,7 +140,9 @@ ProcGlobalSemas(void) * So, now we grab enough semaphores to support the desired max number * of backends immediately at initialization --- if the sysadmin has set * MaxConnections or autovacuum_max_workers higher than his kernel will - * support, he'll find out sooner rather than later. + * support, he'll find out sooner rather than later. (The number of + * background worker processes registered by loadable modules is also taken + * into consideration.) * * Another reason for creating semaphores here is that the semaphore * implementation typically requires us to create semaphores in the @@ -171,6 +173,7 @@ InitProcGlobal(void) ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY; ProcGlobal->freeProcs = NULL; ProcGlobal->autovacFreeProcs = NULL; + ProcGlobal->bgworkerFreeProcs = NULL; ProcGlobal->startupProc = NULL; ProcGlobal->startupProcPid = 0; ProcGlobal->startupBufferPinWaitBufId = -1; @@ -179,10 +182,11 @@ InitProcGlobal(void) /* * Create and initialize all the PGPROC structures we'll need. There are - * four separate consumers: (1) normal backends, (2) autovacuum workers - * and the autovacuum launcher, (3) auxiliary processes, and (4) prepared - * transactions. Each PGPROC structure is dedicated to exactly one of - * these purposes, and they do not move between groups. + * five separate consumers: (1) normal backends, (2) autovacuum workers + * and the autovacuum launcher, (3) background workers, (4) auxiliary + * processes, and (5) prepared transactions. Each PGPROC structure is + * dedicated to exactly one of these purposes, and they do not move between + * groups. */ procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC)); ProcGlobal->allProcs = procs; @@ -223,12 +227,12 @@ InitProcGlobal(void) procs[i].pgprocno = i; /* - * Newly created PGPROCs for normal backends or for autovacuum must be - * queued up on the appropriate free list. Because there can only - * ever be a small, fixed number of auxiliary processes, no free list - * is used in that case; InitAuxiliaryProcess() instead uses a linear - * search. PGPROCs for prepared transactions are added to a free list - * by TwoPhaseShmemInit(). + * Newly created PGPROCs for normal backends, autovacuum and bgworkers + * must be queued up on the appropriate free list. Because there can + * only ever be a small, fixed number of auxiliary processes, no free + * list is used in that case; InitAuxiliaryProcess() instead uses a + * linear search. PGPROCs for prepared transactions are added to a + * free list by TwoPhaseShmemInit(). */ if (i < MaxConnections) { @@ -236,12 +240,18 @@ InitProcGlobal(void) procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; ProcGlobal->freeProcs = &procs[i]; } - else if (i < MaxBackends) + else if (i < MaxConnections + autovacuum_max_workers + 1) { /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; ProcGlobal->autovacFreeProcs = &procs[i]; } + else if (i < MaxBackends) + { + /* PGPROC for bgworker, add to bgworkerFreeProcs list */ + procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs; + ProcGlobal->bgworkerFreeProcs = &procs[i]; + } /* Initialize myProcLocks[] shared memory queues. */ for (j = 0; j < NUM_LOCK_PARTITIONS; j++) @@ -299,6 +309,8 @@ InitProcess(void) if (IsAnyAutoVacuumProcess()) MyProc = procglobal->autovacFreeProcs; + else if (IsBackgroundWorker) + MyProc = procglobal->bgworkerFreeProcs; else MyProc = procglobal->freeProcs; @@ -306,6 +318,8 @@ InitProcess(void) { if (IsAnyAutoVacuumProcess()) procglobal->autovacFreeProcs = (PGPROC *) MyProc->links.next; + else if (IsBackgroundWorker) + procglobal->bgworkerFreeProcs = (PGPROC *) MyProc->links.next; else procglobal->freeProcs = (PGPROC *) MyProc->links.next; SpinLockRelease(ProcStructLock); @@ -782,6 +796,11 @@ ProcKill(int code, Datum arg) MyProc->links.next = (SHM_QUEUE *) procglobal->autovacFreeProcs; procglobal->autovacFreeProcs = MyProc; } + else if (IsBackgroundWorker) + { + MyProc->links.next = (SHM_QUEUE *) procglobal->bgworkerFreeProcs; + procglobal->bgworkerFreeProcs = MyProc; + } else { MyProc->links.next = (SHM_QUEUE *) procglobal->freeProcs; diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 4b66bd3e35..8dd2b4b123 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -87,6 +87,7 @@ pid_t PostmasterPid = 0; bool IsPostmasterEnvironment = false; bool IsUnderPostmaster = false; bool IsBinaryUpgrade = false; +bool IsBackgroundWorker = false; bool ExitOnAnyError = false; diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 5288aa77ee..c518c2133c 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -498,10 +498,10 @@ void InitializeSessionUserIdStandalone(void) { /* - * This function should only be called in single-user mode and in - * autovacuum workers. + * This function should only be called in single-user mode, in + * autovacuum workers, and in background workers. */ - AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess()); + AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker); /* call only once */ AssertState(!OidIsValid(AuthenticatedUserId)); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 2eb456df45..b87ec6c482 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -627,6 +627,19 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, errhint("You should immediately run CREATE USER \"%s\" SUPERUSER;.", username))); } + else if (IsBackgroundWorker) + { + if (username == NULL) + { + InitializeSessionUserIdStandalone(); + am_superuser = true; + } + else + { + InitializeSessionUserId(username); + am_superuser = superuser(); + } + } else { /* normal multiuser case */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 81cf136937..2cf34cea5a 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -52,6 +52,7 @@ #include "parser/scansup.h" #include "pgstat.h" #include "postmaster/autovacuum.h" +#include "postmaster/bgworker.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" @@ -108,7 +109,8 @@ * removed, we still could not exceed INT_MAX/4 because some places compute * 4*MaxBackends without any overflow check. This is rechecked in * check_maxconnections, since MaxBackends is computed as MaxConnections - * plus autovacuum_max_workers plus one (for the autovacuum launcher). + * plus the number of bgworkers plus autovacuum_max_workers plus one (for the + * autovacuum launcher). */ #define MAX_BACKENDS 0x7fffff @@ -8628,7 +8630,8 @@ show_tcp_keepalives_count(void) static bool check_maxconnections(int *newval, void **extra, GucSource source) { - if (*newval + autovacuum_max_workers + 1 > MAX_BACKENDS) + if (*newval + GetNumShmemAttachedBgworkers() + autovacuum_max_workers + 1 > + MAX_BACKENDS) return false; return true; } @@ -8636,13 +8639,15 @@ check_maxconnections(int *newval, void **extra, GucSource source) static void assign_maxconnections(int newval, void *extra) { - MaxBackends = newval + autovacuum_max_workers + 1; + MaxBackends = newval + autovacuum_max_workers + 1 + + GetNumShmemAttachedBgworkers(); } static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source) { - if (MaxConnections + *newval + 1 > MAX_BACKENDS) + if (MaxConnections + *newval + 1 + GetNumShmemAttachedBgworkers() > + MAX_BACKENDS) return false; return true; } @@ -8650,7 +8655,7 @@ check_autovacuum_max_workers(int *newval, void **extra, GucSource source) static void assign_autovacuum_max_workers(int newval, void *extra) { - MaxBackends = MaxConnections + newval + 1; + MaxBackends = MaxConnections + newval + 1 + GetNumShmemAttachedBgworkers(); } static bool diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 3ea349315c..b41227fafb 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -131,6 +131,7 @@ do { \ extern pid_t PostmasterPid; extern bool IsPostmasterEnvironment; extern PGDLLIMPORT bool IsUnderPostmaster; +extern bool IsBackgroundWorker; extern bool IsBinaryUpgrade; extern bool ExitOnAnyError; diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h new file mode 100644 index 0000000000..737728ba3d --- /dev/null +++ b/src/include/postmaster/bgworker.h @@ -0,0 +1,104 @@ +/*-------------------------------------------------------------------- + * bgworker.h + * POSTGRES pluggable background workers interface + * + * A background worker is a process able to run arbitrary, user-supplied code, + * including normal transactions. + * + * Any external module loaded via shared_preload_libraries can register a + * worker. Then, at the appropriate time, the worker process is forked from + * the postmaster and runs the user-supplied "main" function. This code may + * connect to a database and run transactions. Once started, it stays active + * until shutdown or crash. The process should sleep during periods of + * inactivity. + * + * If the fork() call fails in the postmaster, it will try again later. Note + * that the failure can only be transient (fork failure due to high load, + * memory pressure, too many processes, etc); more permanent problems, like + * failure to connect to a database, are detected later in the worker and dealt + * with just by having the worker exit normally. Postmaster will launch a new + * worker again later. + * + * Note that there might be more than one worker in a database concurrently, + * and the same module may request more than one worker running the same (or + * different) code. + * + * + * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/postmaster/bgworker.h + *-------------------------------------------------------------------- + */ +#ifndef BGWORKER_H +#define BGWORKER_H + +/*--------------------------------------------------------------------- + * External module API. + *--------------------------------------------------------------------- + */ + +/* + * Pass this flag to have your worker be able to connect to shared memory. + */ +#define BGWORKER_SHMEM_ACCESS 0x0001 + +/* + * This flag means the bgworker requires a database connection. The connection + * is not established automatically; the worker must establish it later. + * It requires that BGWORKER_SHMEM_ACCESS was passed too. + */ +#define BGWORKER_BACKEND_DATABASE_CONNECTION 0x0002 + + +typedef void (*bgworker_main_type)(void *main_arg); +typedef void (*bgworker_sighdlr_type)(SIGNAL_ARGS); + +/* + * Points in time at which a bgworker can request to be started + */ +typedef enum +{ + BgWorkerStart_PostmasterStart, + BgWorkerStart_ConsistentState, + BgWorkerStart_RecoveryFinished +} BgWorkerStartTime; + +#define BGW_DEFAULT_RESTART_INTERVAL 60 +#define BGW_NEVER_RESTART -1 + +typedef struct BackgroundWorker +{ + char *bgw_name; + int bgw_flags; + BgWorkerStartTime bgw_start_time; + int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */ + bgworker_main_type bgw_main; + void *bgw_main_arg; + bgworker_sighdlr_type bgw_sighup; + bgworker_sighdlr_type bgw_sigterm; +} BackgroundWorker; + +/* Register a new bgworker */ +extern void RegisterBackgroundWorker(BackgroundWorker *worker); + +/* This is valid in a running worker */ +extern BackgroundWorker *MyBgworkerEntry; + +/* + * Connect to the specified database, as the specified user. Only a worker + * that passed BGWORKER_BACKEND_DATABASE_CONNECTION during registration may + * call this. + * + * If username is NULL, bootstrapping superuser is used. + * If dbname is NULL, connection is made to no specific database; + * only shared catalogs can be accessed. + */ +extern void BackgroundWorkerInitializeConnection(char *dbname, char *username); + +/* Block/unblock signals in a background worker process */ +extern void BackgroundWorkerBlockSignals(void); +extern void BackgroundWorkerUnblockSignals(void); + +#endif /* BGWORKER_H */ diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 0fe7ec26db..44221cc545 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -51,6 +51,8 @@ extern void ClosePostmasterPorts(bool am_syslogger); extern int MaxLivePostmasterChildren(void); +extern int GetNumShmemAttachedBgworkers(void); + #ifdef EXEC_BACKEND extern pid_t postmaster_forkexec(int argc, char *argv[]); extern void SubPostmasterMain(int argc, char *argv[]) __attribute__((noreturn)); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 686ac48657..7600007b09 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -189,6 +189,8 @@ typedef struct PROC_HDR PGPROC *freeProcs; /* Head of list of autovacuum's free PGPROC structures */ PGPROC *autovacFreeProcs; + /* Head of list of bgworker free PGPROC structures */ + PGPROC *bgworkerFreeProcs; /* WALWriter process's latch */ Latch *walwriterLatch; /* Checkpointer process's latch */