1 /* -------------------------------------------------------------------------
4 * Sample background worker code that demonstrates various coding
5 * patterns: establishing a database connection; starting and committing
6 * transactions; using GUC variables, and heeding SIGHUP to reread
7 * the configuration file; reporting to pg_stat_activity; using the
8 * process latch to sleep and exit in case of postmaster death.
10 * This code connects to a database, creates a schema and table, and summarizes
11 * the numbers contained therein. To see it working, insert an initial value
12 * with "total" type and some initial value; then insert some other rows with
13 * "delta" type. Delta rows will be deleted by this worker and their values
14 * aggregated into the total.
16 * Copyright (C) 2013, PostgreSQL Global Development Group
19 * contrib/worker_spi/worker_spi.c
21 * -------------------------------------------------------------------------
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "storage/ipc.h"
29 #include "storage/latch.h"
30 #include "storage/lwlock.h"
31 #include "storage/proc.h"
32 #include "storage/shmem.h"
34 /* these headers are used by this particular worker's code */
35 #include "access/xact.h"
36 #include "executor/spi.h"
38 #include "lib/stringinfo.h"
40 #include "utils/builtins.h"
41 #include "utils/snapmgr.h"
42 #include "tcop/utility.h"
46 PG_FUNCTION_INFO_V1(worker_spi_launch);
49 void worker_spi_main(Datum);
51 /* flags set by signal handlers */
52 static volatile sig_atomic_t got_sighup = false;
53 static volatile sig_atomic_t got_sigterm = false;
56 static int worker_spi_naptime = 10;
57 static int worker_spi_total_workers = 2;
60 typedef struct worktable
67 * Signal handler for SIGTERM
68 * Set a flag to let the main loop to terminate, and set our latch to wake
72 worker_spi_sigterm(SIGNAL_ARGS)
74 int save_errno = errno;
78 SetLatch(&MyProc->procLatch);
84 * Signal handler for SIGHUP
85 * Set a flag to tell the main loop to reread the config file, and set
86 * our latch to wake it up.
89 worker_spi_sighup(SIGNAL_ARGS)
91 int save_errno = errno;
95 SetLatch(&MyProc->procLatch);
101 * Initialize workspace for a worker process: create the schema if it doesn't
105 initialize_worker_spi(worktable *table)
112 SetCurrentStatementStartTimestamp();
113 StartTransactionCommand();
115 PushActiveSnapshot(GetTransactionSnapshot());
116 pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
118 /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
119 initStringInfo(&buf);
120 appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
123 ret = SPI_execute(buf.data, true, 0);
124 if (ret != SPI_OK_SELECT)
125 elog(FATAL, "SPI_execute failed: error code %d", ret);
127 if (SPI_processed != 1)
128 elog(FATAL, "not a singleton result");
130 ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
131 SPI_tuptable->tupdesc,
134 elog(FATAL, "null result");
138 resetStringInfo(&buf);
139 appendStringInfo(&buf,
140 "CREATE SCHEMA \"%s\" "
141 "CREATE TABLE \"%s\" ("
142 " type text CHECK (type IN ('total', 'delta')), "
144 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
145 "WHERE type = 'total'",
146 table->schema, table->name, table->name, table->name);
148 /* set statement start time */
149 SetCurrentStatementStartTimestamp();
151 ret = SPI_execute(buf.data, false, 0);
153 if (ret != SPI_OK_UTILITY)
154 elog(FATAL, "failed to create my schema");
159 CommitTransactionCommand();
160 pgstat_report_activity(STATE_IDLE, NULL);
164 worker_spi_main(Datum main_arg)
166 int index = DatumGetInt32(main_arg);
171 table = palloc(sizeof(worktable));
172 sprintf(name, "schema%d", index);
173 table->schema = pstrdup(name);
174 table->name = pstrdup("counted");
176 /* Establish signal handlers before unblocking signals. */
177 pqsignal(SIGHUP, worker_spi_sighup);
178 pqsignal(SIGTERM, worker_spi_sigterm);
180 /* We're now ready to receive signals */
181 BackgroundWorkerUnblockSignals();
183 /* Connect to our database */
184 BackgroundWorkerInitializeConnection("postgres", NULL);
186 elog(LOG, "%s initialized with %s.%s",
187 MyBgworkerEntry->bgw_name, table->schema, table->name);
188 initialize_worker_spi(table);
191 * Quote identifiers passed to us. Note that this must be done after
192 * initialize_worker_spi, because that routine assumes the names are not
195 * Note some memory might be leaked here.
197 table->schema = quote_identifier(table->schema);
198 table->name = quote_identifier(table->name);
200 initStringInfo(&buf);
201 appendStringInfo(&buf,
202 "WITH deleted AS (DELETE "
204 "WHERE type = 'delta' RETURNING value), "
205 "total AS (SELECT coalesce(sum(value), 0) as sum "
208 "SET value = %s.value + total.sum "
209 "FROM total WHERE type = 'total' "
210 "RETURNING %s.value",
211 table->schema, table->name,
212 table->schema, table->name,
217 * Main loop: do this until the SIGTERM handler tells us to terminate
225 * Background workers mustn't call usleep() or any direct equivalent:
226 * instead, they may wait on their process latch, which sleeps as
227 * necessary, but is awakened if postmaster dies. That way the
228 * background process goes away immediately in an emergency.
230 rc = WaitLatch(&MyProc->procLatch,
231 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
232 worker_spi_naptime * 1000L);
233 ResetLatch(&MyProc->procLatch);
235 /* emergency bailout if postmaster has died */
236 if (rc & WL_POSTMASTER_DEATH)
240 * In case of a SIGHUP, just reload the configuration.
245 ProcessConfigFile(PGC_SIGHUP);
249 * Start a transaction on which we can run queries. Note that each
250 * StartTransactionCommand() call should be preceded by a
251 * SetCurrentStatementStartTimestamp() call, which sets both the time
252 * for the statement we're about the run, and also the transaction
253 * start time. Also, each other query sent to SPI should probably be
254 * preceded by SetCurrentStatementStartTimestamp(), so that statement
255 * start time is always up to date.
257 * The SPI_connect() call lets us run queries through the SPI manager,
258 * and the PushActiveSnapshot() call creates an "active" snapshot
259 * which is necessary for queries to have MVCC data to work on.
261 * The pgstat_report_activity() call makes our activity visible
262 * through the pgstat views.
264 SetCurrentStatementStartTimestamp();
265 StartTransactionCommand();
267 PushActiveSnapshot(GetTransactionSnapshot());
268 pgstat_report_activity(STATE_RUNNING, buf.data);
270 /* We can now execute queries via SPI */
271 ret = SPI_execute(buf.data, false, 0);
273 if (ret != SPI_OK_UPDATE_RETURNING)
274 elog(FATAL, "cannot select from table %s.%s: error code %d",
275 table->schema, table->name, ret);
277 if (SPI_processed > 0)
282 val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
283 SPI_tuptable->tupdesc,
286 elog(LOG, "%s: count in %s.%s is now %d",
287 MyBgworkerEntry->bgw_name,
288 table->schema, table->name, val);
292 * And finish our transaction.
296 CommitTransactionCommand();
297 pgstat_report_activity(STATE_IDLE, NULL);
304 * Entrypoint of this module.
306 * We register more than one worker process here, to demonstrate how that can
312 BackgroundWorker worker;
315 /* get the configuration */
316 DefineCustomIntVariable("worker_spi.naptime",
317 "Duration between each check (in seconds).",
329 if (!process_shared_preload_libraries_in_progress)
332 DefineCustomIntVariable("worker_spi.total_workers",
333 "Number of workers.",
335 &worker_spi_total_workers,
345 /* set up common data for all our workers */
346 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
347 BGWORKER_BACKEND_DATABASE_CONNECTION;
348 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
349 worker.bgw_restart_time = BGW_NEVER_RESTART;
350 worker.bgw_main = worker_spi_main;
353 * Now fill in worker-specific data, and do the actual registrations.
355 for (i = 1; i <= worker_spi_total_workers; i++)
357 snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
358 worker.bgw_main_arg = Int32GetDatum(i);
360 RegisterBackgroundWorker(&worker);
365 * Dynamically launch an SPI worker.
368 worker_spi_launch(PG_FUNCTION_ARGS)
370 int32 i = PG_GETARG_INT32(0);
371 BackgroundWorker worker;
372 BackgroundWorkerHandle *handle;
373 BgwHandleStatus status;
376 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
377 BGWORKER_BACKEND_DATABASE_CONNECTION;
378 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
379 worker.bgw_restart_time = BGW_NEVER_RESTART;
380 worker.bgw_main = NULL; /* new worker might not have library loaded */
381 sprintf(worker.bgw_library_name, "worker_spi");
382 sprintf(worker.bgw_function_name, "worker_spi_main");
383 snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
384 worker.bgw_main_arg = Int32GetDatum(i);
385 /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
386 worker.bgw_notify_pid = MyProcPid;
388 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
391 status = WaitForBackgroundWorkerStartup(handle, &pid);
393 if (status == BGWH_STOPPED)
395 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
396 errmsg("could not start background process"),
397 errhint("More details may be available in the server log.")));
398 if (status == BGWH_POSTMASTER_DIED)
400 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
401 errmsg("cannot start background processes without postmaster"),
402 errhint("Kill all remaining database processes and restart the database.")));
403 Assert(status == BGWH_STARTED);
405 PG_RETURN_INT32(pid);