mirror of https://github.com/postgres/postgres
Background worker processes
Background workers are postmaster subprocesses that run arbitrary user-specified code. They can request shared memory access as well as backend database connections; or they can just use plain libpq frontend database connections. Modules listed in shared_preload_libraries can register background workers in their _PG_init() function; this is early enough that it's not necessary to provide an extra GUC option, because the necessary extra resources can be allocated early on. Modules can install more than one bgworker, if necessary. Care is taken that these extra processes do not interfere with other postmaster tasks: only one such process is started on each ServerLoop iteration. This means a large number of them could be waiting to be started up and postmaster is still able to quickly service external connection requests. Also, shutdown sequence should not be impacted by a worker process that's reasonably well behaved (i.e. promptly responds to termination signals.) The current implementation lets worker processes specify their start time, i.e. at what point in the server startup process they are to be started: right after postmaster start (in which case they mustn't ask for shared memory access), when consistent state has been reached (useful during recovery in a HOT standby server), or when recovery has terminated (i.e. when normal backends are allowed). In case of a bgworker crash, actions to take depend on registration data: if shared memory was requested, then all other connections are taken down (as well as other bgworkers), just like it were a regular backend crashing. The bgworker itself is restarted, too, within a configurable timeframe (which can be configured to be never). More features to add to this framework can be imagined without much effort, and have been discussed, but this seems good enough as a useful unit already. An elementary sample module is supplied. Author: Álvaro Herrera This patch is loosely based on prior patches submitted by KaiGai Kohei, and unsubmitted code by Simon Riggs. Reviewed by: KaiGai Kohei, Markus Wanner, Andres Freund, Heikki Linnakangas, Simon Riggs, Amit Kapila
This commit is contained in:
parent
e31d524867
commit
da07a1e856
|
@ -50,7 +50,8 @@ SUBDIRS = \
|
|||
test_parser \
|
||||
tsearch2 \
|
||||
unaccent \
|
||||
vacuumlo
|
||||
vacuumlo \
|
||||
worker_spi
|
||||
|
||||
ifeq ($(with_openssl),yes)
|
||||
SUBDIRS += sslinfo
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
# contrib/worker_spi/Makefile
|
||||
|
||||
MODULES = worker_spi
|
||||
|
||||
ifdef USE_PGXS
|
||||
PG_CONFIG = pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
else
|
||||
subdir = contrib/worker_spi
|
||||
top_builddir = ../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
include $(top_srcdir)/contrib/contrib-global.mk
|
||||
endif
|
|
@ -0,0 +1,263 @@
|
|||
/* -------------------------------------------------------------------------
|
||||
*
|
||||
* worker_spi.c
|
||||
* Sample background worker code that demonstrates usage of a database
|
||||
* connection.
|
||||
*
|
||||
* This code connects to a database, create a schema and table, and summarizes
|
||||
* the numbers contained therein. To see it working, insert an initial value
|
||||
* with "total" type and some initial value; then insert some other rows with
|
||||
* "delta" type. Delta rows will be deleted by this worker and their values
|
||||
* aggregated into the total.
|
||||
*
|
||||
* Copyright (C) 2012, PostgreSQL Global Development Group
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/worker_spi/worker_spi.c
|
||||
*
|
||||
* -------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
/* These are always necessary for a bgworker */
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/shmem.h"
|
||||
|
||||
/* these headers are used by this particular worker's code */
|
||||
#include "access/xact.h"
|
||||
#include "executor/spi.h"
|
||||
#include "fmgr.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/snapmgr.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
void _PG_init(void);
|
||||
|
||||
static bool got_sigterm = false;
|
||||
|
||||
|
||||
typedef struct worktable
|
||||
{
|
||||
const char *schema;
|
||||
const char *name;
|
||||
} worktable;
|
||||
|
||||
static void
|
||||
worker_spi_sigterm(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
got_sigterm = true;
|
||||
if (MyProc)
|
||||
SetLatch(&MyProc->procLatch);
|
||||
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
static void
|
||||
worker_spi_sighup(SIGNAL_ARGS)
|
||||
{
|
||||
elog(LOG, "got sighup!");
|
||||
if (MyProc)
|
||||
SetLatch(&MyProc->procLatch);
|
||||
}
|
||||
|
||||
static void
|
||||
initialize_worker_spi(worktable *table)
|
||||
{
|
||||
int ret;
|
||||
int ntup;
|
||||
bool isnull;
|
||||
StringInfoData buf;
|
||||
|
||||
StartTransactionCommand();
|
||||
SPI_connect();
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
|
||||
initStringInfo(&buf);
|
||||
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
|
||||
table->schema);
|
||||
|
||||
ret = SPI_execute(buf.data, true, 0);
|
||||
if (ret != SPI_OK_SELECT)
|
||||
elog(FATAL, "SPI_execute failed: error code %d", ret);
|
||||
|
||||
if (SPI_processed != 1)
|
||||
elog(FATAL, "not a singleton result");
|
||||
|
||||
ntup = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
|
||||
SPI_tuptable->tupdesc,
|
||||
1, &isnull));
|
||||
if (isnull)
|
||||
elog(FATAL, "null result");
|
||||
|
||||
if (ntup == 0)
|
||||
{
|
||||
resetStringInfo(&buf);
|
||||
appendStringInfo(&buf,
|
||||
"CREATE SCHEMA \"%s\" "
|
||||
"CREATE TABLE \"%s\" ("
|
||||
" type text CHECK (type IN ('total', 'delta')), "
|
||||
" value integer)"
|
||||
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
|
||||
"WHERE type = 'total'",
|
||||
table->schema, table->name, table->name, table->name);
|
||||
|
||||
ret = SPI_execute(buf.data, false, 0);
|
||||
|
||||
if (ret != SPI_OK_UTILITY)
|
||||
elog(FATAL, "failed to create my schema");
|
||||
}
|
||||
|
||||
SPI_finish();
|
||||
PopActiveSnapshot();
|
||||
CommitTransactionCommand();
|
||||
}
|
||||
|
||||
static void
|
||||
worker_spi_main(void *main_arg)
|
||||
{
|
||||
worktable *table = (worktable *) main_arg;
|
||||
StringInfoData buf;
|
||||
|
||||
/* We're now ready to receive signals */
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
/* Connect to our database */
|
||||
BackgroundWorkerInitializeConnection("postgres", NULL);
|
||||
|
||||
elog(LOG, "%s initialized with %s.%s",
|
||||
MyBgworkerEntry->bgw_name, table->schema, table->name);
|
||||
initialize_worker_spi(table);
|
||||
|
||||
/*
|
||||
* Quote identifiers passed to us. Note that this must be done after
|
||||
* initialize_worker_spi, because that routine assumes the names are not
|
||||
* quoted.
|
||||
*
|
||||
* Note some memory might be leaked here.
|
||||
*/
|
||||
table->schema = quote_identifier(table->schema);
|
||||
table->name = quote_identifier(table->name);
|
||||
|
||||
initStringInfo(&buf);
|
||||
appendStringInfo(&buf,
|
||||
"WITH deleted AS (DELETE "
|
||||
"FROM %s.%s "
|
||||
"WHERE type = 'delta' RETURNING value), "
|
||||
"total AS (SELECT coalesce(sum(value), 0) as sum "
|
||||
"FROM deleted) "
|
||||
"UPDATE %s.%s "
|
||||
"SET value = %s.value + total.sum "
|
||||
"FROM total WHERE type = 'total' "
|
||||
"RETURNING %s.value",
|
||||
table->schema, table->name,
|
||||
table->schema, table->name,
|
||||
table->name,
|
||||
table->name);
|
||||
|
||||
while (!got_sigterm)
|
||||
{
|
||||
int ret;
|
||||
int rc;
|
||||
|
||||
/*
|
||||
* Background workers mustn't call usleep() or any direct equivalent:
|
||||
* instead, they may wait on their process latch, which sleeps as
|
||||
* necessary, but is awakened if postmaster dies. That way the
|
||||
* background process goes away immediately in an emergency.
|
||||
*/
|
||||
rc = WaitLatch(&MyProc->procLatch,
|
||||
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
|
||||
1000L);
|
||||
ResetLatch(&MyProc->procLatch);
|
||||
|
||||
/* emergency bailout if postmaster has died */
|
||||
if (rc & WL_POSTMASTER_DEATH)
|
||||
proc_exit(1);
|
||||
|
||||
StartTransactionCommand();
|
||||
SPI_connect();
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
|
||||
ret = SPI_execute(buf.data, false, 0);
|
||||
|
||||
if (ret != SPI_OK_UPDATE_RETURNING)
|
||||
elog(FATAL, "cannot select from table %s.%s: error code %d",
|
||||
table->schema, table->name, ret);
|
||||
|
||||
if (SPI_processed > 0)
|
||||
{
|
||||
bool isnull;
|
||||
int32 val;
|
||||
|
||||
val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
|
||||
SPI_tuptable->tupdesc,
|
||||
1, &isnull));
|
||||
if (!isnull)
|
||||
elog(LOG, "%s: count in %s.%s is now %d",
|
||||
MyBgworkerEntry->bgw_name,
|
||||
table->schema, table->name, val);
|
||||
}
|
||||
|
||||
SPI_finish();
|
||||
PopActiveSnapshot();
|
||||
CommitTransactionCommand();
|
||||
}
|
||||
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Entrypoint of this module.
|
||||
*
|
||||
* We register two worker processes here, to demonstrate how that can be done.
|
||||
*/
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
BackgroundWorker worker;
|
||||
worktable *table;
|
||||
|
||||
/* register the worker processes. These values are common for both */
|
||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
|
||||
BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
worker.bgw_main = worker_spi_main;
|
||||
worker.bgw_sighup = worker_spi_sighup;
|
||||
worker.bgw_sigterm = worker_spi_sigterm;
|
||||
|
||||
/*
|
||||
* These values are used for the first worker.
|
||||
*
|
||||
* Note these are palloc'd. The reason this works after starting a new
|
||||
* worker process is that if we only fork, they point to valid allocated
|
||||
* memory in the child process; and if we fork and then exec, the exec'd
|
||||
* process will run this code again, and so the memory is also valid there.
|
||||
*/
|
||||
table = palloc(sizeof(worktable));
|
||||
table->schema = pstrdup("schema1");
|
||||
table->name = pstrdup("counted");
|
||||
|
||||
worker.bgw_name = "SPI worker 1";
|
||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
||||
worker.bgw_main_arg = (void *) table;
|
||||
RegisterBackgroundWorker(&worker);
|
||||
|
||||
/* Values for the second worker */
|
||||
table = palloc(sizeof(worktable));
|
||||
table->schema = pstrdup("our schema2");
|
||||
table->name = pstrdup("counted rows");
|
||||
|
||||
worker.bgw_name = "SPI worker 2";
|
||||
worker.bgw_restart_time = 2;
|
||||
worker.bgw_main_arg = (void *) table;
|
||||
RegisterBackgroundWorker(&worker);
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
<!-- doc/src/sgml/bgworker.sgml -->
|
||||
|
||||
<chapter id="bgworker">
|
||||
<title>Background Worker Processes</title>
|
||||
|
||||
<indexterm zone="bgworker">
|
||||
<primary>Background workers</primary>
|
||||
</indexterm>
|
||||
|
||||
<para>
|
||||
PostgreSQL can be extended to run user-supplied code in separate processes.
|
||||
Such processes are started, stopped and monitored by <command>postgres</command>,
|
||||
which permits them to have a lifetime closely linked to the server's status.
|
||||
These processes have the option to attach to <productname>PostgreSQL</>'s
|
||||
shared memory area and to connect to databases internally; they can also run
|
||||
multiple transactions serially, just like a regular client-connected server
|
||||
process. Also, by linking to <application>libpq</> they can connect to the
|
||||
server and behave like a regular client application.
|
||||
</para>
|
||||
|
||||
<warning>
|
||||
<para>
|
||||
There are considerable robustness and security risks in using background
|
||||
worker processes because, being written in the <literal>C</> language,
|
||||
they have unrestricted access to data. Administrators wishing to enable
|
||||
modules that include background worker process should exercise extreme
|
||||
caution. Only carefully audited modules should be permitted to run
|
||||
background worker processes.
|
||||
</para>
|
||||
</warning>
|
||||
|
||||
<para>
|
||||
Only modules listed in <varname>shared_preload_libraries</> can run
|
||||
background workers. A module wishing to run a background worker needs
|
||||
to register it by calling
|
||||
<function>RegisterBackgroundWorker(<type>BackgroundWorker *worker</type>)</function>
|
||||
from its <function>_PG_init()</>.
|
||||
The structure <structname>BackgroundWorker</structname> is defined thus:
|
||||
<programlisting>
|
||||
typedef void (*bgworker_main_type)(void *main_arg);
|
||||
typedef void (*bgworker_sighdlr_type)(SIGNAL_ARGS);
|
||||
typedef struct BackgroundWorker
|
||||
{
|
||||
char *bgw_name;
|
||||
int bgw_flags;
|
||||
BgWorkerStartTime bgw_start_time;
|
||||
int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */
|
||||
bgworker_main_type bgw_main;
|
||||
void *bgw_main_arg;
|
||||
bgworker_sighdlr_type bgw_sighup;
|
||||
bgworker_sighdlr_type bgw_sigterm;
|
||||
} BackgroundWorker;
|
||||
</programlisting>
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<structfield>bgw_name</> is a string to be used in log messages, process
|
||||
listings and similar contexts.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<structfield>bgw_flags</> is a bitwise-or'd bitmask indicating the
|
||||
capabilities that the module wants. Possible values are
|
||||
<literal>BGWORKER_SHMEM_ACCESS</literal> (requesting shared memory access)
|
||||
and <literal>BGWORKER_BACKEND_DATABASE_CONNECTION</literal> (requesting the
|
||||
ability to establish a database connection, through which it can later run
|
||||
transactions and queries).
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<structfield>bgw_start_time</structfield> is the server state during which
|
||||
<command>postgres</> should start the process; it can be one of
|
||||
<literal>BgWorkerStart_PostmasterStart</> (start as soon as
|
||||
<command>postgres</> itself has finished its own initialization; processes
|
||||
requesting this are not eligible for database connections),
|
||||
<literal>BgWorkerStart_ConsistentState</> (start as soon as a consistent state
|
||||
has been reached in a HOT standby, allowing processes to connect to
|
||||
databases and run read-only queries), and
|
||||
<literal>BgWorkerStart_RecoveryFinished</> (start as soon as the system has
|
||||
entered normal read-write state). Note the last two values are equivalent
|
||||
in a server that's not a HOT standby. Note that this setting only indicates
|
||||
when the processes are to be started; they do not stop when a different state
|
||||
is reached.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<structfield>bgw_restart_time</structfield> is the interval, in seconds, that
|
||||
<command>postgres</command> should wait before restarting the process, in
|
||||
case it crashes. It can be any positive value,
|
||||
or <literal>BGW_NEVER_RESTART</literal>, indicating not to restart the
|
||||
process in case of a crash.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<structfield>bgw_main</structfield> is a pointer to the function to run when
|
||||
the process is started. This function must take a single argument of type
|
||||
<type>void *</> and return <type>void</>.
|
||||
<structfield>bgw_main_arg</structfield> will be passed to it as its only
|
||||
argument. Note that the global variable <literal>MyBgworkerEntry</literal>
|
||||
points to a copy of the <structname>BackgroundWorker</structname> structure
|
||||
passed at registration time.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
<structfield>bgw_sighup</structfield> and <structfield>bgw_sigterm</> are
|
||||
pointers to functions that will be installed as signal handlers for the new
|
||||
process. If <structfield>bgw_sighup</> is NULL, then <literal>SIG_IGN</>
|
||||
is used; if <structfield>bgw_sigterm</> is NULL, a handler is installed that
|
||||
will terminate the process after logging a suitable message.
|
||||
</para>
|
||||
|
||||
<para>Once running, the process can connect to a database by calling
|
||||
<function>BackgroundWorkerInitializeConnection(<parameter>char *dbname</parameter>, <parameter>char *username</parameter>)</function>.
|
||||
This allows the process to run transactions and queries using the
|
||||
<literal>SPI</literal> interface. If <varname>dbname</> is NULL,
|
||||
the session is not connected to any particular database, but shared catalogs
|
||||
can be accessed. If <varname>username</> is NULL, the process will run as
|
||||
the superuser created during <command>initdb</>.
|
||||
BackgroundWorkerInitializeConnection can only be called once per background
|
||||
process, it is not possible to switch databases.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Signals are initially blocked when control reaches the
|
||||
<structfield>bgw_main</> function, and must be unblocked by it; this is to
|
||||
allow the process to further customize its signal handlers, if necessary.
|
||||
Signals can be unblocked in the new process by calling
|
||||
<function>BackgroundWorkerUnblockSignals</> and blocked by calling
|
||||
<function>BackgroundWorkerBlockSignals</>.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Background workers are expected to be continuously running; if they exit
|
||||
cleanly, <command>postgres</> will restart them immediately. Consider doing
|
||||
interruptible sleep when they have nothing to do; this can be achieved by
|
||||
calling <function>WaitLatch()</function>. Make sure the
|
||||
<literal>WL_POSTMASTER_DEATH</> flag is set when calling that function, and
|
||||
verify the return code for a prompt exit in the emergency case that
|
||||
<command>postgres</> itself has terminated.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
The <filename>worker_spi</> contrib module contains a working example,
|
||||
which demonstrates some useful techniques.
|
||||
</para>
|
||||
</chapter>
|
|
@ -50,6 +50,7 @@
|
|||
<!ENTITY wal SYSTEM "wal.sgml">
|
||||
|
||||
<!-- programmer's guide -->
|
||||
<!ENTITY bgworker SYSTEM "bgworker.sgml">
|
||||
<!ENTITY dfunc SYSTEM "dfunc.sgml">
|
||||
<!ENTITY ecpg SYSTEM "ecpg.sgml">
|
||||
<!ENTITY extend SYSTEM "extend.sgml">
|
||||
|
|
|
@ -218,6 +218,7 @@
|
|||
&plpython;
|
||||
|
||||
&spi;
|
||||
&bgworker;
|
||||
|
||||
</part>
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -140,7 +140,9 @@ ProcGlobalSemas(void)
|
|||
* So, now we grab enough semaphores to support the desired max number
|
||||
* of backends immediately at initialization --- if the sysadmin has set
|
||||
* MaxConnections or autovacuum_max_workers higher than his kernel will
|
||||
* support, he'll find out sooner rather than later.
|
||||
* support, he'll find out sooner rather than later. (The number of
|
||||
* background worker processes registered by loadable modules is also taken
|
||||
* into consideration.)
|
||||
*
|
||||
* Another reason for creating semaphores here is that the semaphore
|
||||
* implementation typically requires us to create semaphores in the
|
||||
|
@ -171,6 +173,7 @@ InitProcGlobal(void)
|
|||
ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
|
||||
ProcGlobal->freeProcs = NULL;
|
||||
ProcGlobal->autovacFreeProcs = NULL;
|
||||
ProcGlobal->bgworkerFreeProcs = NULL;
|
||||
ProcGlobal->startupProc = NULL;
|
||||
ProcGlobal->startupProcPid = 0;
|
||||
ProcGlobal->startupBufferPinWaitBufId = -1;
|
||||
|
@ -179,10 +182,11 @@ InitProcGlobal(void)
|
|||
|
||||
/*
|
||||
* Create and initialize all the PGPROC structures we'll need. There are
|
||||
* four separate consumers: (1) normal backends, (2) autovacuum workers
|
||||
* and the autovacuum launcher, (3) auxiliary processes, and (4) prepared
|
||||
* transactions. Each PGPROC structure is dedicated to exactly one of
|
||||
* these purposes, and they do not move between groups.
|
||||
* five separate consumers: (1) normal backends, (2) autovacuum workers
|
||||
* and the autovacuum launcher, (3) background workers, (4) auxiliary
|
||||
* processes, and (5) prepared transactions. Each PGPROC structure is
|
||||
* dedicated to exactly one of these purposes, and they do not move between
|
||||
* groups.
|
||||
*/
|
||||
procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC));
|
||||
ProcGlobal->allProcs = procs;
|
||||
|
@ -223,12 +227,12 @@ InitProcGlobal(void)
|
|||
procs[i].pgprocno = i;
|
||||
|
||||
/*
|
||||
* Newly created PGPROCs for normal backends or for autovacuum must be
|
||||
* queued up on the appropriate free list. Because there can only
|
||||
* ever be a small, fixed number of auxiliary processes, no free list
|
||||
* is used in that case; InitAuxiliaryProcess() instead uses a linear
|
||||
* search. PGPROCs for prepared transactions are added to a free list
|
||||
* by TwoPhaseShmemInit().
|
||||
* Newly created PGPROCs for normal backends, autovacuum and bgworkers
|
||||
* must be queued up on the appropriate free list. Because there can
|
||||
* only ever be a small, fixed number of auxiliary processes, no free
|
||||
* list is used in that case; InitAuxiliaryProcess() instead uses a
|
||||
* linear search. PGPROCs for prepared transactions are added to a
|
||||
* free list by TwoPhaseShmemInit().
|
||||
*/
|
||||
if (i < MaxConnections)
|
||||
{
|
||||
|
@ -236,12 +240,18 @@ InitProcGlobal(void)
|
|||
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
|
||||
ProcGlobal->freeProcs = &procs[i];
|
||||
}
|
||||
else if (i < MaxBackends)
|
||||
else if (i < MaxConnections + autovacuum_max_workers + 1)
|
||||
{
|
||||
/* PGPROC for AV launcher/worker, add to autovacFreeProcs list */
|
||||
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
|
||||
ProcGlobal->autovacFreeProcs = &procs[i];
|
||||
}
|
||||
else if (i < MaxBackends)
|
||||
{
|
||||
/* PGPROC for bgworker, add to bgworkerFreeProcs list */
|
||||
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs;
|
||||
ProcGlobal->bgworkerFreeProcs = &procs[i];
|
||||
}
|
||||
|
||||
/* Initialize myProcLocks[] shared memory queues. */
|
||||
for (j = 0; j < NUM_LOCK_PARTITIONS; j++)
|
||||
|
@ -299,6 +309,8 @@ InitProcess(void)
|
|||
|
||||
if (IsAnyAutoVacuumProcess())
|
||||
MyProc = procglobal->autovacFreeProcs;
|
||||
else if (IsBackgroundWorker)
|
||||
MyProc = procglobal->bgworkerFreeProcs;
|
||||
else
|
||||
MyProc = procglobal->freeProcs;
|
||||
|
||||
|
@ -306,6 +318,8 @@ InitProcess(void)
|
|||
{
|
||||
if (IsAnyAutoVacuumProcess())
|
||||
procglobal->autovacFreeProcs = (PGPROC *) MyProc->links.next;
|
||||
else if (IsBackgroundWorker)
|
||||
procglobal->bgworkerFreeProcs = (PGPROC *) MyProc->links.next;
|
||||
else
|
||||
procglobal->freeProcs = (PGPROC *) MyProc->links.next;
|
||||
SpinLockRelease(ProcStructLock);
|
||||
|
@ -782,6 +796,11 @@ ProcKill(int code, Datum arg)
|
|||
MyProc->links.next = (SHM_QUEUE *) procglobal->autovacFreeProcs;
|
||||
procglobal->autovacFreeProcs = MyProc;
|
||||
}
|
||||
else if (IsBackgroundWorker)
|
||||
{
|
||||
MyProc->links.next = (SHM_QUEUE *) procglobal->bgworkerFreeProcs;
|
||||
procglobal->bgworkerFreeProcs = MyProc;
|
||||
}
|
||||
else
|
||||
{
|
||||
MyProc->links.next = (SHM_QUEUE *) procglobal->freeProcs;
|
||||
|
|
|
@ -87,6 +87,7 @@ pid_t PostmasterPid = 0;
|
|||
bool IsPostmasterEnvironment = false;
|
||||
bool IsUnderPostmaster = false;
|
||||
bool IsBinaryUpgrade = false;
|
||||
bool IsBackgroundWorker = false;
|
||||
|
||||
bool ExitOnAnyError = false;
|
||||
|
||||
|
|
|
@ -498,10 +498,10 @@ void
|
|||
InitializeSessionUserIdStandalone(void)
|
||||
{
|
||||
/*
|
||||
* This function should only be called in single-user mode and in
|
||||
* autovacuum workers.
|
||||
* This function should only be called in single-user mode, in
|
||||
* autovacuum workers, and in background workers.
|
||||
*/
|
||||
AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess());
|
||||
AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker);
|
||||
|
||||
/* call only once */
|
||||
AssertState(!OidIsValid(AuthenticatedUserId));
|
||||
|
|
|
@ -627,6 +627,19 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
|
|||
errhint("You should immediately run CREATE USER \"%s\" SUPERUSER;.",
|
||||
username)));
|
||||
}
|
||||
else if (IsBackgroundWorker)
|
||||
{
|
||||
if (username == NULL)
|
||||
{
|
||||
InitializeSessionUserIdStandalone();
|
||||
am_superuser = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
InitializeSessionUserId(username);
|
||||
am_superuser = superuser();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* normal multiuser case */
|
||||
|
|
|
@ -52,6 +52,7 @@
|
|||
#include "parser/scansup.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/bgwriter.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "postmaster/syslogger.h"
|
||||
|
@ -108,7 +109,8 @@
|
|||
* removed, we still could not exceed INT_MAX/4 because some places compute
|
||||
* 4*MaxBackends without any overflow check. This is rechecked in
|
||||
* check_maxconnections, since MaxBackends is computed as MaxConnections
|
||||
* plus autovacuum_max_workers plus one (for the autovacuum launcher).
|
||||
* plus the number of bgworkers plus autovacuum_max_workers plus one (for the
|
||||
* autovacuum launcher).
|
||||
*/
|
||||
#define MAX_BACKENDS 0x7fffff
|
||||
|
||||
|
@ -8628,7 +8630,8 @@ show_tcp_keepalives_count(void)
|
|||
static bool
|
||||
check_maxconnections(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (*newval + autovacuum_max_workers + 1 > MAX_BACKENDS)
|
||||
if (*newval + GetNumShmemAttachedBgworkers() + autovacuum_max_workers + 1 >
|
||||
MAX_BACKENDS)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
@ -8636,13 +8639,15 @@ check_maxconnections(int *newval, void **extra, GucSource source)
|
|||
static void
|
||||
assign_maxconnections(int newval, void *extra)
|
||||
{
|
||||
MaxBackends = newval + autovacuum_max_workers + 1;
|
||||
MaxBackends = newval + autovacuum_max_workers + 1 +
|
||||
GetNumShmemAttachedBgworkers();
|
||||
}
|
||||
|
||||
static bool
|
||||
check_autovacuum_max_workers(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (MaxConnections + *newval + 1 > MAX_BACKENDS)
|
||||
if (MaxConnections + *newval + 1 + GetNumShmemAttachedBgworkers() >
|
||||
MAX_BACKENDS)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
@ -8650,7 +8655,7 @@ check_autovacuum_max_workers(int *newval, void **extra, GucSource source)
|
|||
static void
|
||||
assign_autovacuum_max_workers(int newval, void *extra)
|
||||
{
|
||||
MaxBackends = MaxConnections + newval + 1;
|
||||
MaxBackends = MaxConnections + newval + 1 + GetNumShmemAttachedBgworkers();
|
||||
}
|
||||
|
||||
static bool
|
||||
|
|
|
@ -131,6 +131,7 @@ do { \
|
|||
extern pid_t PostmasterPid;
|
||||
extern bool IsPostmasterEnvironment;
|
||||
extern PGDLLIMPORT bool IsUnderPostmaster;
|
||||
extern bool IsBackgroundWorker;
|
||||
extern bool IsBinaryUpgrade;
|
||||
|
||||
extern bool ExitOnAnyError;
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*--------------------------------------------------------------------
|
||||
* bgworker.h
|
||||
* POSTGRES pluggable background workers interface
|
||||
*
|
||||
* A background worker is a process able to run arbitrary, user-supplied code,
|
||||
* including normal transactions.
|
||||
*
|
||||
* Any external module loaded via shared_preload_libraries can register a
|
||||
* worker. Then, at the appropriate time, the worker process is forked from
|
||||
* the postmaster and runs the user-supplied "main" function. This code may
|
||||
* connect to a database and run transactions. Once started, it stays active
|
||||
* until shutdown or crash. The process should sleep during periods of
|
||||
* inactivity.
|
||||
*
|
||||
* If the fork() call fails in the postmaster, it will try again later. Note
|
||||
* that the failure can only be transient (fork failure due to high load,
|
||||
* memory pressure, too many processes, etc); more permanent problems, like
|
||||
* failure to connect to a database, are detected later in the worker and dealt
|
||||
* with just by having the worker exit normally. Postmaster will launch a new
|
||||
* worker again later.
|
||||
*
|
||||
* Note that there might be more than one worker in a database concurrently,
|
||||
* and the same module may request more than one worker running the same (or
|
||||
* different) code.
|
||||
*
|
||||
*
|
||||
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/include/postmaster/bgworker.h
|
||||
*--------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef BGWORKER_H
|
||||
#define BGWORKER_H
|
||||
|
||||
/*---------------------------------------------------------------------
|
||||
* External module API.
|
||||
*---------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* Pass this flag to have your worker be able to connect to shared memory.
|
||||
*/
|
||||
#define BGWORKER_SHMEM_ACCESS 0x0001
|
||||
|
||||
/*
|
||||
* This flag means the bgworker requires a database connection. The connection
|
||||
* is not established automatically; the worker must establish it later.
|
||||
* It requires that BGWORKER_SHMEM_ACCESS was passed too.
|
||||
*/
|
||||
#define BGWORKER_BACKEND_DATABASE_CONNECTION 0x0002
|
||||
|
||||
|
||||
typedef void (*bgworker_main_type)(void *main_arg);
|
||||
typedef void (*bgworker_sighdlr_type)(SIGNAL_ARGS);
|
||||
|
||||
/*
|
||||
* Points in time at which a bgworker can request to be started
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
BgWorkerStart_PostmasterStart,
|
||||
BgWorkerStart_ConsistentState,
|
||||
BgWorkerStart_RecoveryFinished
|
||||
} BgWorkerStartTime;
|
||||
|
||||
#define BGW_DEFAULT_RESTART_INTERVAL 60
|
||||
#define BGW_NEVER_RESTART -1
|
||||
|
||||
typedef struct BackgroundWorker
|
||||
{
|
||||
char *bgw_name;
|
||||
int bgw_flags;
|
||||
BgWorkerStartTime bgw_start_time;
|
||||
int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */
|
||||
bgworker_main_type bgw_main;
|
||||
void *bgw_main_arg;
|
||||
bgworker_sighdlr_type bgw_sighup;
|
||||
bgworker_sighdlr_type bgw_sigterm;
|
||||
} BackgroundWorker;
|
||||
|
||||
/* Register a new bgworker */
|
||||
extern void RegisterBackgroundWorker(BackgroundWorker *worker);
|
||||
|
||||
/* This is valid in a running worker */
|
||||
extern BackgroundWorker *MyBgworkerEntry;
|
||||
|
||||
/*
|
||||
* Connect to the specified database, as the specified user. Only a worker
|
||||
* that passed BGWORKER_BACKEND_DATABASE_CONNECTION during registration may
|
||||
* call this.
|
||||
*
|
||||
* If username is NULL, bootstrapping superuser is used.
|
||||
* If dbname is NULL, connection is made to no specific database;
|
||||
* only shared catalogs can be accessed.
|
||||
*/
|
||||
extern void BackgroundWorkerInitializeConnection(char *dbname, char *username);
|
||||
|
||||
/* Block/unblock signals in a background worker process */
|
||||
extern void BackgroundWorkerBlockSignals(void);
|
||||
extern void BackgroundWorkerUnblockSignals(void);
|
||||
|
||||
#endif /* BGWORKER_H */
|
|
@ -51,6 +51,8 @@ extern void ClosePostmasterPorts(bool am_syslogger);
|
|||
|
||||
extern int MaxLivePostmasterChildren(void);
|
||||
|
||||
extern int GetNumShmemAttachedBgworkers(void);
|
||||
|
||||
#ifdef EXEC_BACKEND
|
||||
extern pid_t postmaster_forkexec(int argc, char *argv[]);
|
||||
extern void SubPostmasterMain(int argc, char *argv[]) __attribute__((noreturn));
|
||||
|
|
|
@ -189,6 +189,8 @@ typedef struct PROC_HDR
|
|||
PGPROC *freeProcs;
|
||||
/* Head of list of autovacuum's free PGPROC structures */
|
||||
PGPROC *autovacFreeProcs;
|
||||
/* Head of list of bgworker free PGPROC structures */
|
||||
PGPROC *bgworkerFreeProcs;
|
||||
/* WALWriter process's latch */
|
||||
Latch *walwriterLatch;
|
||||
/* Checkpointer process's latch */
|
||||
|
|
Loading…
Reference in New Issue