/* -------------------------------------------------------------------------
*
* worker_spi.c
- * Sample background worker code that demonstrates usage of a database
- * connection.
+ * Sample background worker code that demonstrates various coding
+ * patterns: establishing a database connection; starting and committing
+ * transactions; using GUC variables, and heeding SIGHUP to reread
+ * the configuration file; reporting to pg_stat_activity; using the
+ * process latch to sleep and exit in case of postmaster death.
*
- * This code connects to a database, create a schema and table, and summarizes
+ * This code connects to a database, creates a schema and table, and summarizes
* the numbers contained therein. To see it working, insert an initial value
* with "total" type and some initial value; then insert some other rows with
* "delta" type. Delta rows will be deleted by this worker and their values
* aggregated into the total.
*
- * Copyright (C) 2012, PostgreSQL Global Development Group
+ * Copyright (C) 2013, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/worker_spi/worker_spi.c
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
+#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
+#include "tcop/utility.h"
PG_MODULE_MAGIC;
void _PG_init(void);
-static bool got_sigterm = false;
+/* flags set by signal handlers */
+static volatile sig_atomic_t got_sighup = false;
+static volatile sig_atomic_t got_sigterm = false;
+
+/* GUC variables */
+static int worker_spi_naptime = 10;
+static int worker_spi_total_workers = 2;
typedef struct worktable
const char *name;
} worktable;
+/*
+ * Signal handler for SIGTERM
+ * Set a flag to let the main loop to terminate, and set our latch to wake
+ * it up.
+ */
static void
worker_spi_sigterm(SIGNAL_ARGS)
{
errno = save_errno;
}
+/*
+ * Signal handler for SIGHUP
+ * Set a flag to let the main loop to reread the config file, and set
+ * our latch to wake it up.
+ */
static void
worker_spi_sighup(SIGNAL_ARGS)
{
- elog(LOG, "got sighup!");
+ got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
}
+/*
+ * Initialize workspace for a worker process: create the schema if it doesn't
+ * already exist.
+ */
static void
initialize_worker_spi(worktable *table)
{
bool isnull;
StringInfoData buf;
+ SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
+ pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
+ /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
initStringInfo(&buf);
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
table->schema);
"WHERE type = 'total'",
table->schema, table->name, table->name, table->name);
+ /* set statement start time */
+ SetCurrentStatementStartTimestamp();
+
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UTILITY)
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
+ pgstat_report_activity(STATE_IDLE, NULL);
}
static void
table->name,
table->name);
+ /*
+ * Main loop: do this until the SIGTERM handler tells us to terminate
+ */
while (!got_sigterm)
{
int ret;
*/
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- 1000L);
+ worker_spi_naptime * 1000L);
ResetLatch(&MyProc->procLatch);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
+ /*
+ * In case of a SIGHUP, just reload the configuration.
+ */
+ if (got_sighup)
+ {
+ got_sighup = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /*
+ * Start a transaction on which we can run queries. Note that each
+ * StartTransactionCommand() call should be preceded by a
+ * SetCurrentStatementStartTimestamp() call, which sets both the time
+ * for the statement we're about the run, and also the transaction
+ * start time. Also, each other query sent to SPI should probably be
+ * preceded by SetCurrentStatementStartTimestamp(), so that statement
+ * start time is always up to date.
+ *
+ * The SPI_connect() call lets us run queries through the SPI manager,
+ * and the PushActiveSnapshot() call creates an "active" snapshot which
+ * is necessary for queries to have MVCC data to work on.
+ *
+ * The pgstat_report_activity() call makes our activity visible through
+ * the pgstat views.
+ */
+ SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
+ pgstat_report_activity(STATE_RUNNING, buf.data);
+ /* We can now execute queries via SPI */
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UPDATE_RETURNING)
table->schema, table->name, val);
}
+ /*
+ * And finish our transaction.
+ */
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
+ pgstat_report_activity(STATE_IDLE, NULL);
}
proc_exit(0);
/*
* Entrypoint of this module.
*
- * We register two worker processes here, to demonstrate how that can be done.
+ * We register more than one worker process here, to demonstrate how that can
+ * be done.
*/
void
_PG_init(void)
{
BackgroundWorker worker;
worktable *table;
-
- /* register the worker processes. These values are common for both */
+ unsigned int i;
+ char name[20];
+
+ /* get the configuration */
+ DefineCustomIntVariable("worker_spi.naptime",
+ "Duration between each check (in seconds).",
+ NULL,
+ &worker_spi_naptime,
+ 10,
+ 1,
+ INT_MAX,
+ PGC_SIGHUP,
+ 0,
+ NULL,
+ NULL,
+ NULL);
+ DefineCustomIntVariable("worker_spi.total_workers",
+ "Number of workers.",
+ NULL,
+ &worker_spi_total_workers,
+ 2,
+ 1,
+ 100,
+ PGC_POSTMASTER,
+ 0,
+ NULL,
+ NULL,
+ NULL);
+
+ /* set up common data for all our workers */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = worker_spi_main;
worker.bgw_sighup = worker_spi_sighup;
worker.bgw_sigterm = worker_spi_sigterm;
/*
- * These values are used for the first worker.
- *
- * Note these are palloc'd. The reason this works after starting a new
- * worker process is that if we only fork, they point to valid allocated
- * memory in the child process; and if we fork and then exec, the exec'd
- * process will run this code again, and so the memory is also valid there.
+ * Now fill in worker-specific data, and do the actual registrations.
*/
- table = palloc(sizeof(worktable));
- table->schema = pstrdup("schema1");
- table->name = pstrdup("counted");
+ for (i = 1; i <= worker_spi_total_workers; i++)
+ {
+ sprintf(name, "worker %d", i);
+ worker.bgw_name = pstrdup(name);
- worker.bgw_name = "SPI worker 1";
- worker.bgw_restart_time = BGW_NEVER_RESTART;
- worker.bgw_main_arg = (void *) table;
- RegisterBackgroundWorker(&worker);
-
- /* Values for the second worker */
- table = palloc(sizeof(worktable));
- table->schema = pstrdup("our schema2");
- table->name = pstrdup("counted rows");
-
- worker.bgw_name = "SPI worker 2";
- worker.bgw_restart_time = 2;
- worker.bgw_main_arg = (void *) table;
- RegisterBackgroundWorker(&worker);
+ table = palloc(sizeof(worktable));
+ sprintf(name, "schema%d", i);
+ table->schema = pstrdup(name);
+ table->name = pstrdup("counted");
+ worker.bgw_main_arg = (void *) table;
+
+ RegisterBackgroundWorker(&worker);
+ }
}