]> granicus.if.org Git - postgresql/commitdiff
Make worker_spi sample code more complete
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 10 Apr 2013 16:29:25 +0000 (13:29 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 10 Apr 2013 16:29:25 +0000 (13:29 -0300)
Make use of some GUC variables, and add SIGHUP handling to reload
the config file.  Patch submitted by Guillaume Lelarge.

Also, report to pg_stat_activity.  Per report from Marc Cousin, add
setting of statement start time.

contrib/worker_spi/worker_spi.c

index 6da747b47b6b131b4014550dbb80140a68d7fe7b..344455cd579e7409fad3418d930094496c469760 100644 (file)
@@ -1,16 +1,19 @@
 /* -------------------------------------------------------------------------
  *
  * worker_spi.c
- *             Sample background worker code that demonstrates usage of a database
- *             connection.
+ *             Sample background worker code that demonstrates various coding
+ *             patterns: establishing a database connection; starting and committing
+ *             transactions; using GUC variables, and heeding SIGHUP to reread
+ *             the configuration file; reporting to pg_stat_activity; using the
+ *             process latch to sleep and exit in case of postmaster death.
  *
- * This code connects to a database, create a schema and table, and summarizes
+ * This code connects to a database, creates a schema and table, and summarizes
  * the numbers contained therein.  To see it working, insert an initial value
  * with "total" type and some initial value; then insert some other rows with
  * "delta" type.  Delta rows will be deleted by this worker and their values
  * aggregated into the total.
  *
- * Copyright (C) 2012, PostgreSQL Global Development Group
+ * Copyright (C) 2013, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *             contrib/worker_spi/worker_spi.c
 #include "executor/spi.h"
 #include "fmgr.h"
 #include "lib/stringinfo.h"
+#include "pgstat.h"
 #include "utils/builtins.h"
 #include "utils/snapmgr.h"
+#include "tcop/utility.h"
 
 PG_MODULE_MAGIC;
 
 void   _PG_init(void);
 
-static bool    got_sigterm = false;
+/* flags set by signal handlers */
+static volatile sig_atomic_t got_sighup = false;
+static volatile sig_atomic_t got_sigterm = false;
+
+/* GUC variables */
+static int  worker_spi_naptime = 10;
+static int  worker_spi_total_workers = 2;
 
 
 typedef struct worktable
@@ -49,6 +60,11 @@ typedef struct worktable
        const char         *name;
 } worktable;
 
+/*
+ * Signal handler for SIGTERM
+ *             Set a flag to let the main loop to terminate, and set our latch to wake
+ *             it up.
+ */
 static void
 worker_spi_sigterm(SIGNAL_ARGS)
 {
@@ -61,14 +77,23 @@ worker_spi_sigterm(SIGNAL_ARGS)
        errno = save_errno;
 }
 
+/*
+ * Signal handler for SIGHUP
+ *             Set a flag to let the main loop to reread the config file, and set
+ *             our latch to wake it up.
+ */
 static void
 worker_spi_sighup(SIGNAL_ARGS)
 {
-       elog(LOG, "got sighup!");
+       got_sighup = true;
        if (MyProc)
                SetLatch(&MyProc->procLatch);
 }
 
+/*
+ * Initialize workspace for a worker process: create the schema if it doesn't
+ * already exist.
+ */
 static void
 initialize_worker_spi(worktable *table)
 {
@@ -77,10 +102,13 @@ initialize_worker_spi(worktable *table)
        bool    isnull;
        StringInfoData  buf;
 
+       SetCurrentStatementStartTimestamp();
        StartTransactionCommand();
        SPI_connect();
        PushActiveSnapshot(GetTransactionSnapshot());
+       pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
 
+       /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
        initStringInfo(&buf);
        appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
                                         table->schema);
@@ -110,6 +138,9 @@ initialize_worker_spi(worktable *table)
                                                 "WHERE type = 'total'",
                                                 table->schema, table->name, table->name, table->name);
 
+               /* set statement start time */
+               SetCurrentStatementStartTimestamp();
+
                ret = SPI_execute(buf.data, false, 0);
 
                if (ret != SPI_OK_UTILITY)
@@ -119,6 +150,7 @@ initialize_worker_spi(worktable *table)
        SPI_finish();
        PopActiveSnapshot();
        CommitTransactionCommand();
+       pgstat_report_activity(STATE_IDLE, NULL);
 }
 
 static void
@@ -163,6 +195,9 @@ worker_spi_main(void *main_arg)
                                         table->name,
                                         table->name);
 
+       /*
+        * Main loop: do this until the SIGTERM handler tells us to terminate
+        */
        while (!got_sigterm)
        {
                int             ret;
@@ -176,17 +211,45 @@ worker_spi_main(void *main_arg)
                 */
                rc = WaitLatch(&MyProc->procLatch,
                                           WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-                                          1000L);
+                                          worker_spi_naptime * 1000L);
                ResetLatch(&MyProc->procLatch);
 
                /* emergency bailout if postmaster has died */
                if (rc & WL_POSTMASTER_DEATH)
                        proc_exit(1);
 
+               /*
+                * In case of a SIGHUP, just reload the configuration.
+                */
+        if (got_sighup)
+        {
+            got_sighup = false;
+            ProcessConfigFile(PGC_SIGHUP);
+        }
+
+               /*
+                * Start a transaction on which we can run queries.  Note that each
+                * StartTransactionCommand() call should be preceded by a
+                * SetCurrentStatementStartTimestamp() call, which sets both the time
+                * for the statement we're about the run, and also the transaction
+                * start time.  Also, each other query sent to SPI should probably be
+                * preceded by SetCurrentStatementStartTimestamp(), so that statement
+                * start time is always up to date.
+                *
+                * The SPI_connect() call lets us run queries through the SPI manager,
+                * and the PushActiveSnapshot() call creates an "active" snapshot which
+                * is necessary for queries to have MVCC data to work on.
+                *
+                * The pgstat_report_activity() call makes our activity visible through
+                * the pgstat views.
+                */
+               SetCurrentStatementStartTimestamp();
                StartTransactionCommand();
                SPI_connect();
                PushActiveSnapshot(GetTransactionSnapshot());
+               pgstat_report_activity(STATE_RUNNING, buf.data);
 
+               /* We can now execute queries via SPI */
                ret = SPI_execute(buf.data, false, 0);
 
                if (ret != SPI_OK_UPDATE_RETURNING)
@@ -207,9 +270,13 @@ worker_spi_main(void *main_arg)
                                         table->schema, table->name, val);
                }
 
+               /*
+                * And finish our transaction.
+                */
                SPI_finish();
                PopActiveSnapshot();
                CommitTransactionCommand();
+               pgstat_report_activity(STATE_IDLE, NULL);
        }
 
        proc_exit(0);
@@ -218,46 +285,66 @@ worker_spi_main(void *main_arg)
 /*
  * Entrypoint of this module.
  *
- * We register two worker processes here, to demonstrate how that can be done.
+ * We register more than one worker process here, to demonstrate how that can
+ * be done.
  */
 void
 _PG_init(void)
 {
        BackgroundWorker        worker;
        worktable                  *table;
-
-       /* register the worker processes.  These values are common for both */
+       unsigned int        i;
+       char                name[20];
+
+       /* get the configuration */
+       DefineCustomIntVariable("worker_spi.naptime",
+                               "Duration between each check (in seconds).",
+                               NULL,
+                               &worker_spi_naptime,
+                               10,
+                               1,
+                               INT_MAX,
+                               PGC_SIGHUP,
+                               0,
+                               NULL,
+                               NULL,
+                               NULL);
+       DefineCustomIntVariable("worker_spi.total_workers",
+                               "Number of workers.",
+                               NULL,
+                               &worker_spi_total_workers,
+                               2,
+                               1,
+                               100,
+                               PGC_POSTMASTER,
+                               0,
+                               NULL,
+                               NULL,
+                               NULL);
+
+       /* set up common data for all our workers */
        worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
                BGWORKER_BACKEND_DATABASE_CONNECTION;
        worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       worker.bgw_restart_time = BGW_NEVER_RESTART;
        worker.bgw_main = worker_spi_main;
        worker.bgw_sighup = worker_spi_sighup;
        worker.bgw_sigterm = worker_spi_sigterm;
 
        /*
-        * These values are used for the first worker.
-        *
-        * Note these are palloc'd.  The reason this works after starting a new
-        * worker process is that if we only fork, they point to valid allocated
-        * memory in the child process; and if we fork and then exec, the exec'd
-        * process will run this code again, and so the memory is also valid there.
+        * Now fill in worker-specific data, and do the actual registrations.
         */
-       table = palloc(sizeof(worktable));
-       table->schema = pstrdup("schema1");
-       table->name = pstrdup("counted");
+       for (i = 1; i <= worker_spi_total_workers; i++)
+       {
+               sprintf(name, "worker %d", i);
+               worker.bgw_name = pstrdup(name);
 
-       worker.bgw_name = "SPI worker 1";
-       worker.bgw_restart_time = BGW_NEVER_RESTART;
-       worker.bgw_main_arg = (void *) table;
-       RegisterBackgroundWorker(&worker);
-
-       /* Values for the second worker */
-       table = palloc(sizeof(worktable));
-       table->schema = pstrdup("our schema2");
-       table->name = pstrdup("counted rows");
-
-       worker.bgw_name = "SPI worker 2";
-       worker.bgw_restart_time = 2;
-       worker.bgw_main_arg = (void *) table;
-       RegisterBackgroundWorker(&worker);
+               table = palloc(sizeof(worktable));
+               sprintf(name, "schema%d", i);
+               table->schema = pstrdup(name);
+               table->name = pstrdup("counted");
+               worker.bgw_main_arg = (void *) table;
+
+               RegisterBackgroundWorker(&worker);
+       }
 }