MODULES = worker_spi
+EXTENSION = worker_spi
+DATA = worker_spi--1.0.sql
+
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
--- /dev/null
+/* contrib/worker_spi/worker_spi--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit
+
+CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
+RETURNS pg_catalog.bool STRICT
+AS 'MODULE_PATHNAME'
+LANGUAGE C;
#include "tcop/utility.h"
PG_MODULE_MAGIC;
+PG_FUNCTION_INFO_V1(worker_spi_launch);
void _PG_init(void);
+void worker_spi_main(Datum);
+Datum worker_spi_launch(PG_FUNCTION_ARGS);
/* flags set by signal handlers */
static volatile sig_atomic_t got_sighup = false;
pgstat_report_activity(STATE_IDLE, NULL);
}
-static void
-worker_spi_main(void *main_arg)
+void
+worker_spi_main(Datum main_arg)
{
- worktable *table = (worktable *) main_arg;
+ int index = DatumGetInt32(main_arg);
+ worktable *table;
StringInfoData buf;
+ char name[20];
+
+ table = palloc(sizeof(worktable));
+ sprintf(name, "schema%d", index);
+ table->schema = pstrdup(name);
+ table->name = pstrdup("counted");
+
+ /* Establish signal handlers before unblocking signals. */
+ pqsignal(SIGHUP, worker_spi_sighup);
+ pqsignal(SIGTERM, worker_spi_sigterm);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
pgstat_report_activity(STATE_IDLE, NULL);
}
- proc_exit(0);
+ proc_exit(1);
}
/*
_PG_init(void)
{
BackgroundWorker worker;
- worktable *table;
unsigned int i;
- char name[20];
/* get the configuration */
DefineCustomIntVariable("worker_spi.naptime",
NULL,
NULL,
NULL);
+
+ if (!process_shared_preload_libraries_in_progress)
+ return;
+
DefineCustomIntVariable("worker_spi.total_workers",
"Number of workers.",
NULL,
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = worker_spi_main;
- worker.bgw_sighup = worker_spi_sighup;
- worker.bgw_sigterm = worker_spi_sigterm;
+ worker.bgw_sighup = NULL;
+ worker.bgw_sigterm = NULL;
/*
* Now fill in worker-specific data, and do the actual registrations.
*/
for (i = 1; i <= worker_spi_total_workers; i++)
{
- sprintf(name, "worker %d", i);
- worker.bgw_name = pstrdup(name);
-
- table = palloc(sizeof(worktable));
- sprintf(name, "schema%d", i);
- table->schema = pstrdup(name);
- table->name = pstrdup("counted");
- worker.bgw_main_arg = (void *) table;
+ snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
+ worker.bgw_main_arg = Int32GetDatum(i);
RegisterBackgroundWorker(&worker);
}
}
+
+/*
+ * Dynamically launch an SPI worker.
+ */
+Datum
+worker_spi_launch(PG_FUNCTION_ARGS)
+{
+ int32 i = PG_GETARG_INT32(0);
+ BackgroundWorker worker;
+
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ worker.bgw_main = NULL; /* new worker might not have library loaded */
+ sprintf(worker.bgw_library_name, "worker_spi");
+ sprintf(worker.bgw_function_name, "worker_spi_main");
+ worker.bgw_sighup = NULL; /* new worker might not have library loaded */
+ worker.bgw_sigterm = NULL; /* new worker might not have library loaded */
+ snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
+ worker.bgw_main_arg = Int32GetDatum(i);
+
+ PG_RETURN_BOOL(RegisterDynamicBackgroundWorker(&worker));
+}
--- /dev/null
+# worker_spi extension
+comment = 'Sample background worker'
+default_version = '1.0'
+module_pathname = '$libdir/worker_spi'
+relocatable = true
</warning>
<para>
- Only modules listed in <varname>shared_preload_libraries</> can run
- background workers. A module wishing to run a background worker needs
- to register it by calling
+ Background workers can be initialized at the time that
+ <productname>PostgreSQL</> is started including the module name in
+ <varname>shared_preload_libraries</>. A module wishing to run a background
+ worker can register it by calling
<function>RegisterBackgroundWorker(<type>BackgroundWorker *worker</type>)</function>
- from its <function>_PG_init()</>.
+ from its <function>_PG_init()</>. Background workers can also be started
+ after the system is up and running by calling the function
+ <function>RegisterDynamicBackgroundWorker</function>(<type>BackgroundWorker
+ *worker</type>). Unlike <function>RegisterBackgroundWorker</>, which can
+ only be called from within the postmaster,
+ <function>RegisterDynamicBackgroundWorker</function> must be called from
+ a regular backend.
+ </para>
+
+ <para>
The structure <structname>BackgroundWorker</structname> is defined thus:
<programlisting>
typedef void (*bgworker_main_type)(void *main_arg);
typedef void (*bgworker_sighdlr_type)(SIGNAL_ARGS);
typedef struct BackgroundWorker
{
- char *bgw_name;
+ char bgw_name[BGW_MAXLEN];
int bgw_flags;
BgWorkerStartTime bgw_start_time;
int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */
- bgworker_main_type bgw_main;
- void *bgw_main_arg;
+ bgworker_main_type bgw_main;
+ char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
+ char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
+ Datum bgw_main_arg;
bgworker_sighdlr_type bgw_sighup;
bgworker_sighdlr_type bgw_sigterm;
} BackgroundWorker;
<structfield>bgw_main_arg</structfield> will be passed to it as its only
argument. Note that the global variable <literal>MyBgworkerEntry</literal>
points to a copy of the <structname>BackgroundWorker</structname> structure
- passed at registration time.
+ passed at registration time. <structfield>bgw_main</structfield> may be
+ NULL; in that case, <structfield>bgw_library_name</structfield> and
+ <structfield>bgw_function_name</structfield> will be used to determine
+ the entrypoint. This is useful for background workers launched after
+ postmaster startup, where the postmaster does not have the requisite
+ library loaded.
+ </para>
+
+ <para>
+ <structfield>bgw_library_name</structfield> is the name of a library in
+ which the initial entrypoint for the background worker should be sought.
+ It is ignored unless <structfield>bgw_main</structfield> is NULL.
+ But if <structfield>bgw_main</structfield> is NULL, then the named library
+ will be dynamically loaded by the worker process and
+ <structfield>bgw_function_name</structfield> will be used to identify
+ the function to be called.
+ </para>
+
+ <para>
+ <structfield>bgw_function_name</structfield> is the name of a function in
+ a dynamically loaded library which should be used as the initial entrypoint
+ for a new background worker. It is ignored unless
+ <structfield>bgw_main</structfield> is NULL.
</para>
<para>
pointers to functions that will be installed as signal handlers for the new
process. If <structfield>bgw_sighup</> is NULL, then <literal>SIG_IGN</>
is used; if <structfield>bgw_sigterm</> is NULL, a handler is installed that
- will terminate the process after logging a suitable message.
+ will terminate the process after logging a suitable message. These
+ fields should not be used if <structfield>bgw_main</> is NULL; instead,
+ the worker process should set its own signal handlers before calling
+ <function>BackgroundWorkerUnblockSignals()</function>.
</para>
<para>Once running, the process can connect to a database by calling
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = autovacuum.o bgwriter.o fork_process.o pgarch.o pgstat.o postmaster.o \
- startup.o syslogger.o walwriter.o checkpointer.o
+OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
+ pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+/*--------------------------------------------------------------------
+ * bgworker.c
+ * POSTGRES pluggable background workers implementation
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/bgworker.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "postmaster/bgworker_internals.h"
+#include "storage/barrier.h"
+#include "storage/lwlock.h"
+#include "storage/pmsignal.h"
+#include "storage/shmem.h"
+#include "utils/ascii.h"
+
+/*
+ * The postmaster's list of registered background workers, in private memory.
+ */
+slist_head BackgroundWorkerList = SLIST_STATIC_INIT(BackgroundWorkerList);
+
+/*
+ * BackgroundWorkerSlots exist in shared memory and can be accessed (via
+ * the BackgroundWorkerArray) by both the postmaster and by regular backends.
+ * However, the postmaster cannot take locks, even spinlocks, because this
+ * might allow it to crash or become wedged if shared memory gets corrupted.
+ * Such an outcome is intolerable. Therefore, we need a lockless protocol
+ * for coordinating access to this data.
+ *
+ * The 'in_use' flag is used to hand off responsibility for the slot between
+ * the postmaster and the rest of the system. When 'in_use' is false,
+ * the postmaster will ignore the slot entirely, except for the 'in_use' flag
+ * itself, which it may read. In this state, regular backends may modify the
+ * slot. Once a backend sets 'in_use' to true, the slot becomes the
+ * responsibility of the postmaster. Regular backends may no longer modify it,
+ * but the postmaster may examine it. Thus, a backend initializing a slot
+ * must fully initialize the slot - and insert a write memory barrier - before
+ * marking it as in use.
+ *
+ * In addition to coordinating with the postmaster, backends modifying this
+ * data structure must coordinate with each other. Since they can take locks,
+ * this is straightforward: any backend wishing to manipulate a slot must
+ * take BackgroundWorkerLock in exclusive mode. Backends wishing to read
+ * data that might get concurrently modified by other backends should take
+ * this lock in shared mode. No matter what, backends reading this data
+ * structure must be able to tolerate concurrent modifications by the
+ * postmaster.
+ */
+typedef struct BackgroundWorkerSlot
+{
+ bool in_use;
+ BackgroundWorker worker;
+} BackgroundWorkerSlot;
+
+typedef struct BackgroundWorkerArray
+{
+ int total_slots;
+ BackgroundWorkerSlot slot[FLEXIBLE_ARRAY_MEMBER];
+} BackgroundWorkerArray;
+
+BackgroundWorkerArray *BackgroundWorkerData;
+
+/*
+ * Calculate shared memory needed.
+ */
+Size
+BackgroundWorkerShmemSize(void)
+{
+ Size size;
+
+ /* Array of workers is variably sized. */
+ size = offsetof(BackgroundWorkerArray, slot);
+ size = add_size(size, mul_size(max_worker_processes,
+ sizeof(BackgroundWorkerSlot)));
+
+ return size;
+}
+
+/*
+ * Initialize shared memory.
+ */
+void
+BackgroundWorkerShmemInit(void)
+{
+ bool found;
+
+ BackgroundWorkerData = ShmemInitStruct("Background Worker Data",
+ BackgroundWorkerShmemSize(),
+ &found);
+ if (!IsUnderPostmaster)
+ {
+ slist_iter siter;
+ int slotno = 0;
+
+ BackgroundWorkerData->total_slots = max_worker_processes;
+
+ /*
+ * Copy contents of worker list into shared memory. Record the
+ * shared memory slot assigned to each worker. This ensures
+ * a 1-to-1 correspondence betwen the postmaster's private list and
+ * the array in shared memory.
+ */
+ slist_foreach(siter, &BackgroundWorkerList)
+ {
+ BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
+ RegisteredBgWorker *rw;
+
+ rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur);
+ Assert(slotno < max_worker_processes);
+ slot->in_use = true;
+ rw->rw_shmem_slot = slotno;
+ memcpy(&slot->worker, &rw->rw_worker, sizeof(BackgroundWorker));
+ ++slotno;
+ }
+
+ /*
+ * Mark any remaining slots as not in use.
+ */
+ while (slotno < max_worker_processes)
+ {
+ BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
+
+ slot->in_use = false;
+ ++slotno;
+ }
+ }
+ else
+ Assert(found);
+}
+
+static RegisteredBgWorker *
+FindRegisteredWorkerBySlotNumber(int slotno)
+{
+ slist_iter siter;
+
+ /*
+ * Copy contents of worker list into shared memory. Record the
+ * shared memory slot assigned to each worker. This ensures
+ * a 1-to-1 correspondence betwen the postmaster's private list and
+ * the array in shared memory.
+ */
+ slist_foreach(siter, &BackgroundWorkerList)
+ {
+ RegisteredBgWorker *rw;
+
+ rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur);
+ if (rw->rw_shmem_slot == slotno)
+ return rw;
+ }
+
+ return NULL;
+}
+
+/*
+ * Notice changes to shared_memory made by other backends. This code
+ * runs in the postmaster, so we must be very careful not to assume that
+ * shared memory contents are sane. Otherwise, a rogue backend could take
+ * out the postmaster.
+ */
+void
+BackgroundWorkerStateChange(void)
+{
+ int slotno;
+
+ /*
+ * The total number of slots stored in shared memory should match our
+ * notion of max_worker_processes. If it does not, something is very
+ * wrong. Further down, we always refer to this value as
+ * max_worker_processes, in case shared memory gets corrupted while
+ * we're looping.
+ */
+ if (max_worker_processes != BackgroundWorkerData->total_slots)
+ {
+ elog(LOG,
+ "inconsistent background worker state (max_worker_processes=%d, total_slots=%d",
+ max_worker_processes,
+ BackgroundWorkerData->total_slots);
+ return;
+ }
+
+ /*
+ * Iterate through slots, looking for newly-registered workers or
+ * workers who must die.
+ */
+ for (slotno = 0; slotno < max_worker_processes; ++slotno)
+ {
+ BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
+ RegisteredBgWorker *rw;
+
+ if (!slot->in_use)
+ continue;
+
+ /*
+ * Make sure we don't see the in_use flag before the updated slot
+ * contents.
+ */
+ pg_read_barrier();
+
+ /*
+ * See whether we already know about this worker. If not, we need
+ * to update our backend-private BackgroundWorkerList to match shared
+ * memory.
+ */
+ rw = FindRegisteredWorkerBySlotNumber(slotno);
+ if (rw != NULL)
+ continue;
+
+ /*
+ * Copy the registration data into the registered workers list.
+ */
+ rw = malloc(sizeof(RegisteredBgWorker));
+ if (rw == NULL)
+ {
+ ereport(LOG,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ return;
+ }
+
+ /*
+ * Copy strings in a paranoid way. If shared memory is corrupted,
+ * the source data might not even be NUL-terminated.
+ */
+ ascii_safe_strlcpy(rw->rw_worker.bgw_name,
+ slot->worker.bgw_name, BGW_MAXLEN);
+ ascii_safe_strlcpy(rw->rw_worker.bgw_library_name,
+ slot->worker.bgw_library_name, BGW_MAXLEN);
+ ascii_safe_strlcpy(rw->rw_worker.bgw_function_name,
+ slot->worker.bgw_function_name, BGW_MAXLEN);
+
+ /*
+ * Copy remaining fields.
+ *
+ * flags, start_time, and restart_time are examined by the
+ * postmaster, but nothing too bad will happen if they are
+ * corrupted. The remaining fields will only be examined by the
+ * child process. It might crash, but we won't.
+ */
+ rw->rw_worker.bgw_flags = slot->worker.bgw_flags;
+ rw->rw_worker.bgw_start_time = slot->worker.bgw_start_time;
+ rw->rw_worker.bgw_restart_time = slot->worker.bgw_restart_time;
+ rw->rw_worker.bgw_main = slot->worker.bgw_main;
+ rw->rw_worker.bgw_main_arg = slot->worker.bgw_main_arg;
+ rw->rw_worker.bgw_sighup = slot->worker.bgw_sighup;
+ rw->rw_worker.bgw_sigterm = slot->worker.bgw_sigterm;
+
+ /* Initialize postmaster bookkeeping. */
+ rw->rw_backend = NULL;
+ rw->rw_pid = 0;
+ rw->rw_child_slot = 0;
+ rw->rw_crashed_at = 0;
+ rw->rw_shmem_slot = slotno;
+
+ /* Log it! */
+ ereport(LOG,
+ (errmsg("registering background worker: %s",
+ rw->rw_worker.bgw_name)));
+
+ slist_push_head(&BackgroundWorkerList, &rw->rw_lnode);
+ }
+}
+
+/*
+ * Forget about a background worker that's no longer needed.
+ *
+ * At present, this only happens when a background worker marked
+ * BGW_NEVER_RESTART exits. This function should only be invoked in
+ * the postmaster.
+ */
+void
+ForgetBackgroundWorker(RegisteredBgWorker *rw)
+{
+ BackgroundWorkerSlot *slot;
+
+ Assert(rw->rw_shmem_slot < max_worker_processes);
+ slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
+ slot->in_use = false;
+
+ ereport(LOG,
+ (errmsg("unregistering background worker: %s",
+ rw->rw_worker.bgw_name)));
+
+ slist_delete(&BackgroundWorkerList, &rw->rw_lnode);
+ free(rw);
+}
+
+#ifdef EXEC_BACKEND
+/*
+ * In EXEC_BACKEND mode, workers use this to retrieve their details from
+ * shared memory.
+ */
+BackgroundWorker *
+BackgroundWorkerEntry(int slotno)
+{
+ BackgroundWorkerSlot *slot;
+
+ Assert(slotno < BackgroundWorkerData->total_slots);
+ slot = &BackgroundWorkerData->slot[slotno];
+ Assert(slot->in_use);
+ return &slot->worker; /* can't become free while we're still here */
+}
+#endif
+
+/*
+ * Complain about the BackgroundWorker definition using error level elevel.
+ * Return true if it looks ok, false if not (unless elevel >= ERROR, in
+ * which case we won't return at all in the not-OK case).
+ */
+static bool
+SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel)
+{
+ /* sanity check for flags */
+ if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION)
+ {
+ if (!(worker->bgw_flags & BGWORKER_SHMEM_ACCESS))
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("background worker \"%s\": must attach to shared memory in order to request a database connection",
+ worker->bgw_name)));
+ return false;
+ }
+
+ if (worker->bgw_start_time == BgWorkerStart_PostmasterStart)
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("background worker \"%s\": cannot request database access if starting at postmaster start",
+ worker->bgw_name)));
+ return false;
+ }
+
+ /* XXX other checks? */
+ }
+
+ if ((worker->bgw_restart_time < 0 &&
+ worker->bgw_restart_time != BGW_NEVER_RESTART) ||
+ (worker->bgw_restart_time > USECS_PER_DAY / 1000))
+ {
+ ereport(elevel,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("background worker \"%s\": invalid restart interval",
+ worker->bgw_name)));
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Register a new background worker while processing shared_preload_libraries.
+ *
+ * This can only be called in the _PG_init function of a module library
+ * that's loaded by shared_preload_libraries; otherwise it has no effect.
+ */
+void
+RegisterBackgroundWorker(BackgroundWorker *worker)
+{
+ RegisteredBgWorker *rw;
+ static int numworkers = 0;
+
+ if (!IsUnderPostmaster)
+ ereport(LOG,
+ (errmsg("registering background worker: %s", worker->bgw_name)));
+
+ if (!process_shared_preload_libraries_in_progress)
+ {
+ if (!IsUnderPostmaster)
+ ereport(LOG,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("background worker \"%s\": must be registered in shared_preload_libraries",
+ worker->bgw_name)));
+ return;
+ }
+
+ if (!SanityCheckBackgroundWorker(worker, LOG))
+ return;
+
+ /*
+ * Enforce maximum number of workers. Note this is overly restrictive: we
+ * could allow more non-shmem-connected workers, because these don't count
+ * towards the MAX_BACKENDS limit elsewhere. For now, it doesn't seem
+ * important to relax this restriction.
+ */
+ if (++numworkers > max_worker_processes)
+ {
+ ereport(LOG,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("too many background workers"),
+ errdetail_plural("Up to %d background worker can be registered with the current settings.",
+ "Up to %d background workers can be registered with the current settings.",
+ max_worker_processes,
+ max_worker_processes),
+ errhint("Consider increasing the configuration parameter \"max_worker_processes\".")));
+ return;
+ }
+
+ /*
+ * Copy the registration data into the registered workers list.
+ */
+ rw = malloc(sizeof(RegisteredBgWorker));
+ if (rw == NULL)
+ {
+ ereport(LOG,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ return;
+ }
+
+ rw->rw_worker = *worker;
+ rw->rw_backend = NULL;
+ rw->rw_pid = 0;
+ rw->rw_child_slot = 0;
+ rw->rw_crashed_at = 0;
+
+ slist_push_head(&BackgroundWorkerList, &rw->rw_lnode);
+}
+
+/*
+ * Register a new background worker from a regular backend.
+ *
+ * Returns true on success and false on failure. Failure typically indicates
+ * that no background worker slots are currently available.
+ */
+bool
+RegisterDynamicBackgroundWorker(BackgroundWorker *worker)
+{
+ int slotno;
+ bool success = false;
+
+ /*
+ * We can't register dynamic background workers from the postmaster.
+ * If this is a standalone backend, we're the only process and can't
+ * start any more. In a multi-process environement, it might be
+ * theoretically possible, but we don't currently support it due to
+ * locking considerations; see comments on the BackgroundWorkerSlot
+ * data structure.
+ */
+ if (!IsUnderPostmaster)
+ return false;
+
+ if (!SanityCheckBackgroundWorker(worker, ERROR))
+ return false;
+
+ LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE);
+
+ /*
+ * Look for an unused slot. If we find one, grab it.
+ */
+ for (slotno = 0; slotno < BackgroundWorkerData->total_slots; ++slotno)
+ {
+ BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
+
+ if (!slot->in_use)
+ {
+ memcpy(&slot->worker, worker, sizeof(BackgroundWorker));
+
+ /*
+ * Make sure postmaster doesn't see the slot as in use before
+ * it sees the new contents.
+ */
+ pg_write_barrier();
+
+ slot->in_use = true;
+ success = true;
+ break;
+ }
+ }
+
+ LWLockRelease(BackgroundWorkerLock);
+
+ /* If we found a slot, tell the postmaster to notice the change. */
+ if (success)
+ SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);
+
+ return success;
+}
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
-#include "postmaster/bgworker.h"
+#include "postmaster/bgworker_internals.h"
#include "postmaster/fork_process.h"
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
+#include "utils/dynamic_loader.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
static Backend *ShmemBackendArray;
#endif
-
-/*
- * List of background workers.
- *
- * A worker that requests a database connection during registration will have
- * rw_backend set, and will be present in BackendList. Note: do not rely on
- * rw_backend being non-NULL for shmem-connected workers!
- */
-typedef struct RegisteredBgWorker
-{
- BackgroundWorker rw_worker; /* its registry entry */
- Backend *rw_backend; /* its BackendList entry, or NULL */
- pid_t rw_pid; /* 0 if not running */
- int rw_child_slot;
- TimestampTz rw_crashed_at; /* if not 0, time it last crashed */
-#ifdef EXEC_BACKEND
- int rw_cookie;
-#endif
- slist_node rw_lnode; /* list link */
-} RegisteredBgWorker;
-
-static slist_head BackgroundWorkerList = SLIST_STATIC_INIT(BackgroundWorkerList);
-
BackgroundWorker *MyBgworkerEntry = NULL;
static void ShmemBackendArrayAdd(Backend *bn);
static void ShmemBackendArrayRemove(Backend *bn);
-
-static BackgroundWorker *find_bgworker_entry(int cookie);
#endif /* EXEC_BACKEND */
#define StartupDataBase() StartChildProcess(StartupProcess)
if (HaveCrashedWorker)
{
- slist_iter siter;
+ slist_mutable_iter siter;
/*
* When there are crashed bgworkers, we sleep just long enough that
* determine the minimum of all wakeup times according to most recent
* crash time and requested restart interval.
*/
- slist_foreach(siter, &BackgroundWorkerList)
+ slist_foreach_modify(siter, &BackgroundWorkerList)
{
RegisteredBgWorker *rw;
TimestampTz this_wakeup;
continue;
if (rw->rw_worker.bgw_restart_time == BGW_NEVER_RESTART)
+ {
+ ForgetBackgroundWorker(rw);
continue;
+ }
this_wakeup = TimestampTzPlusMilliseconds(rw->rw_crashed_at,
1000L * rw->rw_worker.bgw_restart_time);
}
if (strncmp(argv[1], "--forkbgworker=", 15) == 0)
{
- int cookie;
+ int shmem_slot;
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
/* Attach process to shared data structures */
CreateSharedMemoryAndSemaphores(false, 0);
- cookie = atoi(argv[1] + 15);
- MyBgworkerEntry = find_bgworker_entry(cookie);
+ shmem_slot = atoi(argv[1] + 15);
+ MyBgworkerEntry = BackgroundWorkerEntry(shmem_slot);
do_start_bgworker();
}
if (strcmp(argv[1], "--forkarch") == 0)
sigusr1_handler(SIGNAL_ARGS)
{
int save_errno = errno;
+ bool start_bgworker = false;
PG_SETMASK(&BlockSig);
+ /* Process background worker state change. */
+ if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
+ {
+ BackgroundWorkerStateChange();
+ start_bgworker = true;
+ }
+
/*
* RECOVERY_STARTED and BEGIN_HOT_STANDBY signals are ignored in
* unexpected states. If the startup process quickly starts up, completes
(errmsg("database system is ready to accept read only connections")));
pmState = PM_HOT_STANDBY;
-
/* Some workers may be scheduled to start now */
- StartOneBackgroundWorker();
+ start_bgworker = true;
}
+ if (start_bgworker)
+ StartOneBackgroundWorker();
+
if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) &&
PgArchPID != 0)
{
max_worker_processes);
}
-/*
- * Register a new background worker.
- *
- * This can only be called in the _PG_init function of a module library
- * that's loaded by shared_preload_libraries; otherwise it has no effect.
- */
-void
-RegisterBackgroundWorker(BackgroundWorker *worker)
-{
- RegisteredBgWorker *rw;
- int namelen = strlen(worker->bgw_name);
- static int numworkers = 0;
-
-#ifdef EXEC_BACKEND
-
- /*
- * Use 1 here, not 0, to avoid confusing a possible bogus cookie read by
- * atoi() in SubPostmasterMain.
- */
- static int BackgroundWorkerCookie = 1;
-#endif
-
- if (!IsUnderPostmaster)
- ereport(LOG,
- (errmsg("registering background worker: %s", worker->bgw_name)));
-
- if (!process_shared_preload_libraries_in_progress)
- {
- if (!IsUnderPostmaster)
- ereport(LOG,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("background worker \"%s\": must be registered in shared_preload_libraries",
- worker->bgw_name)));
- return;
- }
-
- /* sanity check for flags */
- if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION)
- {
- if (!(worker->bgw_flags & BGWORKER_SHMEM_ACCESS))
- {
- if (!IsUnderPostmaster)
- ereport(LOG,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("background worker \"%s\": must attach to shared memory in order to request a database connection",
- worker->bgw_name)));
- return;
- }
-
- if (worker->bgw_start_time == BgWorkerStart_PostmasterStart)
- {
- if (!IsUnderPostmaster)
- ereport(LOG,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("background worker \"%s\": cannot request database access if starting at postmaster start",
- worker->bgw_name)));
- return;
- }
-
- /* XXX other checks? */
- }
-
- if ((worker->bgw_restart_time < 0 &&
- worker->bgw_restart_time != BGW_NEVER_RESTART) ||
- (worker->bgw_restart_time > USECS_PER_DAY / 1000))
- {
- if (!IsUnderPostmaster)
- ereport(LOG,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("background worker \"%s\": invalid restart interval",
- worker->bgw_name)));
- return;
- }
-
- /*
- * Enforce maximum number of workers. Note this is overly restrictive: we
- * could allow more non-shmem-connected workers, because these don't count
- * towards the MAX_BACKENDS limit elsewhere. For now, it doesn't seem
- * important to relax this restriction.
- */
- if (++numworkers > max_worker_processes)
- {
- ereport(LOG,
- (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
- errmsg("too many background workers"),
- errdetail_plural("Up to %d background worker can be registered with the current settings.",
- "Up to %d background workers can be registered with the current settings.",
- max_worker_processes,
- max_worker_processes),
- errhint("Consider increasing the configuration parameter \"max_worker_processes\".")));
- return;
- }
-
- /*
- * Copy the registration data into the registered workers list.
- */
- rw = malloc(sizeof(RegisteredBgWorker) + namelen + 1);
- if (rw == NULL)
- {
- ereport(LOG,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- return;
- }
-
- rw->rw_worker = *worker;
- rw->rw_worker.bgw_name = ((char *) rw) + sizeof(RegisteredBgWorker);
- strlcpy(rw->rw_worker.bgw_name, worker->bgw_name, namelen + 1);
-
- rw->rw_backend = NULL;
- rw->rw_pid = 0;
- rw->rw_child_slot = 0;
- rw->rw_crashed_at = 0;
-#ifdef EXEC_BACKEND
- rw->rw_cookie = BackgroundWorkerCookie++;
-#endif
-
- slist_push_head(&BackgroundWorkerList, &rw->rw_lnode);
-}
-
/*
* Connect background worker to a database.
*/
PG_SETMASK(&UnBlockSig);
}
-#ifdef EXEC_BACKEND
-static BackgroundWorker *
-find_bgworker_entry(int cookie)
-{
- slist_iter iter;
-
- slist_foreach(iter, &BackgroundWorkerList)
- {
- RegisteredBgWorker *rw;
-
- rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur);
- if (rw->rw_cookie == cookie)
- return &rw->rw_worker;
- }
-
- return NULL;
-}
-#endif
-
static void
bgworker_quickdie(SIGNAL_ARGS)
{
sigjmp_buf local_sigjmp_buf;
char buf[MAXPGPATH];
BackgroundWorker *worker = MyBgworkerEntry;
+ bgworker_main_type entrypt;
if (worker == NULL)
elog(FATAL, "unable to find bgworker entry");
InitProcess();
#endif
+ /*
+ * If bgw_main is set, we use that value as the initial entrypoint.
+ * However, if the library containing the entrypoint wasn't loaded at
+ * postmaster startup time, passing it as a direct function pointer is
+ * not possible. To work around that, we allow callers for whom a
+ * function pointer is not available to pass a library name (which will
+ * be loaded, if necessary) and a function name (which will be looked up
+ * in the named library).
+ */
+ if (worker->bgw_main != NULL)
+ entrypt = worker->bgw_main;
+ else
+ entrypt = (bgworker_main_type)
+ load_external_function(worker->bgw_library_name,
+ worker->bgw_function_name,
+ true, NULL);
+
/*
* Note that in normal processes, we would call InitPostgres here. For a
* worker, however, we don't know what database to connect to, yet; so we
/*
* Now invoke the user-defined worker code
*/
- worker->bgw_main(worker->bgw_main_arg);
+ entrypt(worker->bgw_main_arg);
/* ... and if it returns, we're done */
proc_exit(0);
#ifdef EXEC_BACKEND
static pid_t
-bgworker_forkexec(int cookie)
+bgworker_forkexec(int shmem_slot)
{
char *av[10];
int ac = 0;
char forkav[MAXPGPATH];
- snprintf(forkav, MAXPGPATH, "--forkbgworker=%d", cookie);
+ snprintf(forkav, MAXPGPATH, "--forkbgworker=%d", shmem_slot);
av[ac++] = "postgres";
av[ac++] = forkav;
rw->rw_worker.bgw_name)));
#ifdef EXEC_BACKEND
- switch ((worker_pid = bgworker_forkexec(rw->rw_cookie)))
+ switch ((worker_pid = bgworker_forkexec(rw->rw_shmem_slot)))
#else
switch ((worker_pid = fork_process()))
#endif
static void
StartOneBackgroundWorker(void)
{
- slist_iter iter;
+ slist_mutable_iter iter;
TimestampTz now = 0;
if (FatalError)
HaveCrashedWorker = false;
- slist_foreach(iter, &BackgroundWorkerList)
+ slist_foreach_modify(iter, &BackgroundWorkerList)
{
RegisteredBgWorker *rw;
if (rw->rw_crashed_at != 0)
{
if (rw->rw_worker.bgw_restart_time == BGW_NEVER_RESTART)
+ {
+ ForgetBackgroundWorker(rw);
continue;
+ }
if (now == 0)
now = GetCurrentTimestamp();
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgworker_internals.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
#include "replication/walreceiver.h"
size = add_size(size, CLOGShmemSize());
size = add_size(size, SUBTRANSShmemSize());
size = add_size(size, TwoPhaseShmemSize());
+ size = add_size(size, BackgroundWorkerShmemSize());
size = add_size(size, MultiXactShmemSize());
size = add_size(size, LWLockShmemSize());
size = add_size(size, ProcArrayShmemSize());
CreateSharedProcArray();
CreateSharedBackendStatus();
TwoPhaseShmemInit();
+ BackgroundWorkerShmemInit();
/*
* Set up shared-inval messaging
#define BGWORKER_BACKEND_DATABASE_CONNECTION 0x0002
-typedef void (*bgworker_main_type) (void *main_arg);
+typedef void (*bgworker_main_type) (Datum main_arg);
typedef void (*bgworker_sighdlr_type) (SIGNAL_ARGS);
/*
#define BGW_DEFAULT_RESTART_INTERVAL 60
#define BGW_NEVER_RESTART -1
+#define BGW_MAXLEN 64
typedef struct BackgroundWorker
{
- char *bgw_name;
+ char bgw_name[BGW_MAXLEN];
int bgw_flags;
BgWorkerStartTime bgw_start_time;
int bgw_restart_time; /* in seconds, or BGW_NEVER_RESTART */
bgworker_main_type bgw_main;
- void *bgw_main_arg;
+ char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
+ char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
+ Datum bgw_main_arg;
bgworker_sighdlr_type bgw_sighup;
bgworker_sighdlr_type bgw_sigterm;
} BackgroundWorker;
-/* Register a new bgworker */
+/* Register a new bgworker during shared_preload_libraries */
extern void RegisterBackgroundWorker(BackgroundWorker *worker);
+/* Register a new bgworker from a regular backend */
+extern bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker);
+
/* This is valid in a running worker */
extern BackgroundWorker *MyBgworkerEntry;
--- /dev/null
+/*--------------------------------------------------------------------
+ * bgworker_internals.h
+ * POSTGRES pluggable background workers internals
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/postmaster/bgworker.h
+ *--------------------------------------------------------------------
+ */
+#ifndef BGWORKER_INTERNALS_H
+#define BGWORKER_INTERNALS_H
+
+#include "datatype/timestamp.h"
+#include "lib/ilist.h"
+#include "postmaster/bgworker.h"
+
+/*
+ * List of background workers, private to postmaster.
+ *
+ * A worker that requests a database connection during registration will have
+ * rw_backend set, and will be present in BackendList. Note: do not rely on
+ * rw_backend being non-NULL for shmem-connected workers!
+ */
+typedef struct RegisteredBgWorker
+{
+ BackgroundWorker rw_worker; /* its registry entry */
+ struct bkend *rw_backend; /* its BackendList entry, or NULL */
+ pid_t rw_pid; /* 0 if not running */
+ int rw_child_slot;
+ TimestampTz rw_crashed_at; /* if not 0, time it last crashed */
+ int rw_shmem_slot;
+ slist_node rw_lnode; /* list link */
+} RegisteredBgWorker;
+
+extern slist_head BackgroundWorkerList;
+
+extern Size BackgroundWorkerShmemSize(void);
+extern void BackgroundWorkerShmemInit(void);
+extern void BackgroundWorkerStateChange(void);
+extern void ForgetBackgroundWorker(RegisteredBgWorker *);
+
+#ifdef EXEC_BACKEND
+extern BackgroundWorker *BackgroundWorkerEntry(int slotno);
+#endif
+
+#endif /* BGWORKER_INTERNLS_H */
SerializablePredicateLockListLock,
OldSerXidLock,
SyncRepLock,
+ BackgroundWorkerLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
+ PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */