]> granicus.if.org Git - postgresql/blob - contrib/worker_spi/worker_spi.c
pgindent run for 9.4
[postgresql] / contrib / worker_spi / worker_spi.c
1 /* -------------------------------------------------------------------------
2  *
3  * worker_spi.c
4  *              Sample background worker code that demonstrates various coding
5  *              patterns: establishing a database connection; starting and committing
6  *              transactions; using GUC variables, and heeding SIGHUP to reread
7  *              the configuration file; reporting to pg_stat_activity; using the
8  *              process latch to sleep and exit in case of postmaster death.
9  *
10  * This code connects to a database, creates a schema and table, and summarizes
11  * the numbers contained therein.  To see it working, insert an initial value
12  * with "total" type and some initial value; then insert some other rows with
13  * "delta" type.  Delta rows will be deleted by this worker and their values
14  * aggregated into the total.
15  *
16  * Copyright (C) 2013, PostgreSQL Global Development Group
17  *
18  * IDENTIFICATION
19  *              contrib/worker_spi/worker_spi.c
20  *
21  * -------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "storage/ipc.h"
29 #include "storage/latch.h"
30 #include "storage/lwlock.h"
31 #include "storage/proc.h"
32 #include "storage/shmem.h"
33
34 /* these headers are used by this particular worker's code */
35 #include "access/xact.h"
36 #include "executor/spi.h"
37 #include "fmgr.h"
38 #include "lib/stringinfo.h"
39 #include "pgstat.h"
40 #include "utils/builtins.h"
41 #include "utils/snapmgr.h"
42 #include "tcop/utility.h"
43
44 PG_MODULE_MAGIC;
45
46 PG_FUNCTION_INFO_V1(worker_spi_launch);
47
48 void            _PG_init(void);
49 void            worker_spi_main(Datum);
50
51 /* flags set by signal handlers */
52 static volatile sig_atomic_t got_sighup = false;
53 static volatile sig_atomic_t got_sigterm = false;
54
55 /* GUC variables */
56 static int      worker_spi_naptime = 10;
57 static int      worker_spi_total_workers = 2;
58
59
60 typedef struct worktable
61 {
62         const char *schema;
63         const char *name;
64 } worktable;
65
66 /*
67  * Signal handler for SIGTERM
68  *              Set a flag to let the main loop to terminate, and set our latch to wake
69  *              it up.
70  */
71 static void
72 worker_spi_sigterm(SIGNAL_ARGS)
73 {
74         int                     save_errno = errno;
75
76         got_sigterm = true;
77         if (MyProc)
78                 SetLatch(&MyProc->procLatch);
79
80         errno = save_errno;
81 }
82
83 /*
84  * Signal handler for SIGHUP
85  *              Set a flag to tell the main loop to reread the config file, and set
86  *              our latch to wake it up.
87  */
88 static void
89 worker_spi_sighup(SIGNAL_ARGS)
90 {
91         int                     save_errno = errno;
92
93         got_sighup = true;
94         if (MyProc)
95                 SetLatch(&MyProc->procLatch);
96
97         errno = save_errno;
98 }
99
100 /*
101  * Initialize workspace for a worker process: create the schema if it doesn't
102  * already exist.
103  */
104 static void
105 initialize_worker_spi(worktable *table)
106 {
107         int                     ret;
108         int                     ntup;
109         bool            isnull;
110         StringInfoData buf;
111
112         SetCurrentStatementStartTimestamp();
113         StartTransactionCommand();
114         SPI_connect();
115         PushActiveSnapshot(GetTransactionSnapshot());
116         pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
117
118         /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
119         initStringInfo(&buf);
120         appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
121                                          table->schema);
122
123         ret = SPI_execute(buf.data, true, 0);
124         if (ret != SPI_OK_SELECT)
125                 elog(FATAL, "SPI_execute failed: error code %d", ret);
126
127         if (SPI_processed != 1)
128                 elog(FATAL, "not a singleton result");
129
130         ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
131                                                                            SPI_tuptable->tupdesc,
132                                                                            1, &isnull));
133         if (isnull)
134                 elog(FATAL, "null result");
135
136         if (ntup == 0)
137         {
138                 resetStringInfo(&buf);
139                 appendStringInfo(&buf,
140                                                  "CREATE SCHEMA \"%s\" "
141                                                  "CREATE TABLE \"%s\" ("
142                            "            type text CHECK (type IN ('total', 'delta')), "
143                                                  "              value   integer)"
144                                   "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
145                                                  "WHERE type = 'total'",
146                                            table->schema, table->name, table->name, table->name);
147
148                 /* set statement start time */
149                 SetCurrentStatementStartTimestamp();
150
151                 ret = SPI_execute(buf.data, false, 0);
152
153                 if (ret != SPI_OK_UTILITY)
154                         elog(FATAL, "failed to create my schema");
155         }
156
157         SPI_finish();
158         PopActiveSnapshot();
159         CommitTransactionCommand();
160         pgstat_report_activity(STATE_IDLE, NULL);
161 }
162
163 void
164 worker_spi_main(Datum main_arg)
165 {
166         int                     index = DatumGetInt32(main_arg);
167         worktable  *table;
168         StringInfoData buf;
169         char            name[20];
170
171         table = palloc(sizeof(worktable));
172         sprintf(name, "schema%d", index);
173         table->schema = pstrdup(name);
174         table->name = pstrdup("counted");
175
176         /* Establish signal handlers before unblocking signals. */
177         pqsignal(SIGHUP, worker_spi_sighup);
178         pqsignal(SIGTERM, worker_spi_sigterm);
179
180         /* We're now ready to receive signals */
181         BackgroundWorkerUnblockSignals();
182
183         /* Connect to our database */
184         BackgroundWorkerInitializeConnection("postgres", NULL);
185
186         elog(LOG, "%s initialized with %s.%s",
187                  MyBgworkerEntry->bgw_name, table->schema, table->name);
188         initialize_worker_spi(table);
189
190         /*
191          * Quote identifiers passed to us.  Note that this must be done after
192          * initialize_worker_spi, because that routine assumes the names are not
193          * quoted.
194          *
195          * Note some memory might be leaked here.
196          */
197         table->schema = quote_identifier(table->schema);
198         table->name = quote_identifier(table->name);
199
200         initStringInfo(&buf);
201         appendStringInfo(&buf,
202                                          "WITH deleted AS (DELETE "
203                                          "FROM %s.%s "
204                                          "WHERE type = 'delta' RETURNING value), "
205                                          "total AS (SELECT coalesce(sum(value), 0) as sum "
206                                          "FROM deleted) "
207                                          "UPDATE %s.%s "
208                                          "SET value = %s.value + total.sum "
209                                          "FROM total WHERE type = 'total' "
210                                          "RETURNING %s.value",
211                                          table->schema, table->name,
212                                          table->schema, table->name,
213                                          table->name,
214                                          table->name);
215
216         /*
217          * Main loop: do this until the SIGTERM handler tells us to terminate
218          */
219         while (!got_sigterm)
220         {
221                 int                     ret;
222                 int                     rc;
223
224                 /*
225                  * Background workers mustn't call usleep() or any direct equivalent:
226                  * instead, they may wait on their process latch, which sleeps as
227                  * necessary, but is awakened if postmaster dies.  That way the
228                  * background process goes away immediately in an emergency.
229                  */
230                 rc = WaitLatch(&MyProc->procLatch,
231                                            WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
232                                            worker_spi_naptime * 1000L);
233                 ResetLatch(&MyProc->procLatch);
234
235                 /* emergency bailout if postmaster has died */
236                 if (rc & WL_POSTMASTER_DEATH)
237                         proc_exit(1);
238
239                 /*
240                  * In case of a SIGHUP, just reload the configuration.
241                  */
242                 if (got_sighup)
243                 {
244                         got_sighup = false;
245                         ProcessConfigFile(PGC_SIGHUP);
246                 }
247
248                 /*
249                  * Start a transaction on which we can run queries.  Note that each
250                  * StartTransactionCommand() call should be preceded by a
251                  * SetCurrentStatementStartTimestamp() call, which sets both the time
252                  * for the statement we're about the run, and also the transaction
253                  * start time.  Also, each other query sent to SPI should probably be
254                  * preceded by SetCurrentStatementStartTimestamp(), so that statement
255                  * start time is always up to date.
256                  *
257                  * The SPI_connect() call lets us run queries through the SPI manager,
258                  * and the PushActiveSnapshot() call creates an "active" snapshot
259                  * which is necessary for queries to have MVCC data to work on.
260                  *
261                  * The pgstat_report_activity() call makes our activity visible
262                  * through the pgstat views.
263                  */
264                 SetCurrentStatementStartTimestamp();
265                 StartTransactionCommand();
266                 SPI_connect();
267                 PushActiveSnapshot(GetTransactionSnapshot());
268                 pgstat_report_activity(STATE_RUNNING, buf.data);
269
270                 /* We can now execute queries via SPI */
271                 ret = SPI_execute(buf.data, false, 0);
272
273                 if (ret != SPI_OK_UPDATE_RETURNING)
274                         elog(FATAL, "cannot select from table %s.%s: error code %d",
275                                  table->schema, table->name, ret);
276
277                 if (SPI_processed > 0)
278                 {
279                         bool            isnull;
280                         int32           val;
281
282                         val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
283                                                                                           SPI_tuptable->tupdesc,
284                                                                                           1, &isnull));
285                         if (!isnull)
286                                 elog(LOG, "%s: count in %s.%s is now %d",
287                                          MyBgworkerEntry->bgw_name,
288                                          table->schema, table->name, val);
289                 }
290
291                 /*
292                  * And finish our transaction.
293                  */
294                 SPI_finish();
295                 PopActiveSnapshot();
296                 CommitTransactionCommand();
297                 pgstat_report_activity(STATE_IDLE, NULL);
298         }
299
300         proc_exit(1);
301 }
302
303 /*
304  * Entrypoint of this module.
305  *
306  * We register more than one worker process here, to demonstrate how that can
307  * be done.
308  */
309 void
310 _PG_init(void)
311 {
312         BackgroundWorker worker;
313         unsigned int i;
314
315         /* get the configuration */
316         DefineCustomIntVariable("worker_spi.naptime",
317                                                         "Duration between each check (in seconds).",
318                                                         NULL,
319                                                         &worker_spi_naptime,
320                                                         10,
321                                                         1,
322                                                         INT_MAX,
323                                                         PGC_SIGHUP,
324                                                         0,
325                                                         NULL,
326                                                         NULL,
327                                                         NULL);
328
329         if (!process_shared_preload_libraries_in_progress)
330                 return;
331
332         DefineCustomIntVariable("worker_spi.total_workers",
333                                                         "Number of workers.",
334                                                         NULL,
335                                                         &worker_spi_total_workers,
336                                                         2,
337                                                         1,
338                                                         100,
339                                                         PGC_POSTMASTER,
340                                                         0,
341                                                         NULL,
342                                                         NULL,
343                                                         NULL);
344
345         /* set up common data for all our workers */
346         worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
347                 BGWORKER_BACKEND_DATABASE_CONNECTION;
348         worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
349         worker.bgw_restart_time = BGW_NEVER_RESTART;
350         worker.bgw_main = worker_spi_main;
351
352         /*
353          * Now fill in worker-specific data, and do the actual registrations.
354          */
355         for (i = 1; i <= worker_spi_total_workers; i++)
356         {
357                 snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
358                 worker.bgw_main_arg = Int32GetDatum(i);
359
360                 RegisterBackgroundWorker(&worker);
361         }
362 }
363
364 /*
365  * Dynamically launch an SPI worker.
366  */
367 Datum
368 worker_spi_launch(PG_FUNCTION_ARGS)
369 {
370         int32           i = PG_GETARG_INT32(0);
371         BackgroundWorker worker;
372         BackgroundWorkerHandle *handle;
373         BgwHandleStatus status;
374         pid_t           pid;
375
376         worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
377                 BGWORKER_BACKEND_DATABASE_CONNECTION;
378         worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
379         worker.bgw_restart_time = BGW_NEVER_RESTART;
380         worker.bgw_main = NULL;         /* new worker might not have library loaded */
381         sprintf(worker.bgw_library_name, "worker_spi");
382         sprintf(worker.bgw_function_name, "worker_spi_main");
383         snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
384         worker.bgw_main_arg = Int32GetDatum(i);
385         /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
386         worker.bgw_notify_pid = MyProcPid;
387
388         if (!RegisterDynamicBackgroundWorker(&worker, &handle))
389                 PG_RETURN_NULL();
390
391         status = WaitForBackgroundWorkerStartup(handle, &pid);
392
393         if (status == BGWH_STOPPED)
394                 ereport(ERROR,
395                                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
396                                  errmsg("could not start background process"),
397                            errhint("More details may be available in the server log.")));
398         if (status == BGWH_POSTMASTER_DIED)
399                 ereport(ERROR,
400                                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
401                           errmsg("cannot start background processes without postmaster"),
402                                  errhint("Kill all remaining database processes and restart the database.")));
403         Assert(status == BGWH_STARTED);
404
405         PG_RETURN_INT32(pid);
406 }