From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 10 Apr 2013 16:29:25 +0000 (-0300)
Subject: Make worker_spi sample code more complete
X-Git-Tag: REL9_3_BETA1~114
X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=e543631f3c162ab5f6020b1d0209e0353ca2229a;p=postgresql

Make worker_spi sample code more complete

Make use of some GUC variables, and add SIGHUP handling to reload
the config file.  Patch submitted by Guillaume Lelarge.

Also, report to pg_stat_activity.  Per report from Marc Cousin, add
setting of statement start time.
---

diff --git a/contrib/worker_spi/worker_spi.c b/contrib/worker_spi/worker_spi.c
index 6da747b47b..344455cd57 100644
--- a/contrib/worker_spi/worker_spi.c
+++ b/contrib/worker_spi/worker_spi.c
@@ -1,16 +1,19 @@
 /* -------------------------------------------------------------------------
  *
  * worker_spi.c
- *		Sample background worker code that demonstrates usage of a database
- *		connection.
+ *		Sample background worker code that demonstrates various coding
+ *		patterns: establishing a database connection; starting and committing
+ *		transactions; using GUC variables, and heeding SIGHUP to reread
+ *		the configuration file; reporting to pg_stat_activity; using the
+ *		process latch to sleep and exit in case of postmaster death.
  *
- * This code connects to a database, create a schema and table, and summarizes
+ * This code connects to a database, creates a schema and table, and summarizes
  * the numbers contained therein.  To see it working, insert an initial value
  * with "total" type and some initial value; then insert some other rows with
  * "delta" type.  Delta rows will be deleted by this worker and their values
  * aggregated into the total.
  *
- * Copyright (C) 2012, PostgreSQL Global Development Group
+ * Copyright (C) 2013, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *		contrib/worker_spi/worker_spi.c
@@ -33,14 +36,22 @@
 #include "executor/spi.h"
 #include "fmgr.h"
 #include "lib/stringinfo.h"
+#include "pgstat.h"
 #include "utils/builtins.h"
 #include "utils/snapmgr.h"
+#include "tcop/utility.h"
 
 PG_MODULE_MAGIC;
 
 void	_PG_init(void);
 
-static bool	got_sigterm = false;
+/* flags set by signal handlers */
+static volatile sig_atomic_t got_sighup = false;
+static volatile sig_atomic_t got_sigterm = false;
+
+/* GUC variables */
+static int  worker_spi_naptime = 10;
+static int  worker_spi_total_workers = 2;
 
 
 typedef struct worktable
@@ -49,6 +60,11 @@ typedef struct worktable
 	const char	   *name;
 } worktable;
 
+/*
+ * Signal handler for SIGTERM
+ * 		Set a flag to let the main loop to terminate, and set our latch to wake
+ * 		it up.
+ */
 static void
 worker_spi_sigterm(SIGNAL_ARGS)
 {
@@ -61,14 +77,23 @@ worker_spi_sigterm(SIGNAL_ARGS)
 	errno = save_errno;
 }
 
+/*
+ * Signal handler for SIGHUP
+ * 		Set a flag to let the main loop to reread the config file, and set
+ * 		our latch to wake it up.
+ */
 static void
 worker_spi_sighup(SIGNAL_ARGS)
 {
-	elog(LOG, "got sighup!");
+	got_sighup = true;
 	if (MyProc)
 		SetLatch(&MyProc->procLatch);
 }
 
+/*
+ * Initialize workspace for a worker process: create the schema if it doesn't
+ * already exist.
+ */
 static void
 initialize_worker_spi(worktable *table)
 {
@@ -77,10 +102,13 @@ initialize_worker_spi(worktable *table)
 	bool	isnull;
 	StringInfoData	buf;
 
+	SetCurrentStatementStartTimestamp();
 	StartTransactionCommand();
 	SPI_connect();
 	PushActiveSnapshot(GetTransactionSnapshot());
+	pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
 
+	/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
 	initStringInfo(&buf);
 	appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
 					 table->schema);
@@ -110,6 +138,9 @@ initialize_worker_spi(worktable *table)
 						 "WHERE type = 'total'",
 						 table->schema, table->name, table->name, table->name);
 
+		/* set statement start time */
+		SetCurrentStatementStartTimestamp();
+
 		ret = SPI_execute(buf.data, false, 0);
 
 		if (ret != SPI_OK_UTILITY)
@@ -119,6 +150,7 @@ initialize_worker_spi(worktable *table)
 	SPI_finish();
 	PopActiveSnapshot();
 	CommitTransactionCommand();
+	pgstat_report_activity(STATE_IDLE, NULL);
 }
 
 static void
@@ -163,6 +195,9 @@ worker_spi_main(void *main_arg)
 					 table->name,
 					 table->name);
 
+	/*
+	 * Main loop: do this until the SIGTERM handler tells us to terminate
+	 */
 	while (!got_sigterm)
 	{
 		int		ret;
@@ -176,17 +211,45 @@ worker_spi_main(void *main_arg)
 		 */
 		rc = WaitLatch(&MyProc->procLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-					   1000L);
+					   worker_spi_naptime * 1000L);
 		ResetLatch(&MyProc->procLatch);
 
 		/* emergency bailout if postmaster has died */
 		if (rc & WL_POSTMASTER_DEATH)
 			proc_exit(1);
 
+		/*
+		 * In case of a SIGHUP, just reload the configuration.
+		 */
+        if (got_sighup)
+        {
+            got_sighup = false;
+            ProcessConfigFile(PGC_SIGHUP);
+        }
+
+		/*
+		 * Start a transaction on which we can run queries.  Note that each
+		 * StartTransactionCommand() call should be preceded by a
+		 * SetCurrentStatementStartTimestamp() call, which sets both the time
+		 * for the statement we're about the run, and also the transaction
+		 * start time.  Also, each other query sent to SPI should probably be
+		 * preceded by SetCurrentStatementStartTimestamp(), so that statement
+		 * start time is always up to date.
+		 *
+		 * The SPI_connect() call lets us run queries through the SPI manager,
+		 * and the PushActiveSnapshot() call creates an "active" snapshot which
+		 * is necessary for queries to have MVCC data to work on.
+		 *
+		 * The pgstat_report_activity() call makes our activity visible through
+		 * the pgstat views.
+		 */
+		SetCurrentStatementStartTimestamp();
 		StartTransactionCommand();
 		SPI_connect();
 		PushActiveSnapshot(GetTransactionSnapshot());
+		pgstat_report_activity(STATE_RUNNING, buf.data);
 
+		/* We can now execute queries via SPI */
 		ret = SPI_execute(buf.data, false, 0);
 
 		if (ret != SPI_OK_UPDATE_RETURNING)
@@ -207,9 +270,13 @@ worker_spi_main(void *main_arg)
 					 table->schema, table->name, val);
 		}
 
+		/*
+		 * And finish our transaction.
+		 */
 		SPI_finish();
 		PopActiveSnapshot();
 		CommitTransactionCommand();
+		pgstat_report_activity(STATE_IDLE, NULL);
 	}
 
 	proc_exit(0);
@@ -218,46 +285,66 @@ worker_spi_main(void *main_arg)
 /*
  * Entrypoint of this module.
  *
- * We register two worker processes here, to demonstrate how that can be done.
+ * We register more than one worker process here, to demonstrate how that can
+ * be done.
  */
 void
 _PG_init(void)
 {
 	BackgroundWorker	worker;
 	worktable		   *table;
-
-	/* register the worker processes.  These values are common for both */
+	unsigned int        i;
+	char                name[20];
+
+	/* get the configuration */
+	DefineCustomIntVariable("worker_spi.naptime",
+				"Duration between each check (in seconds).",
+				NULL,
+				&worker_spi_naptime,
+				10,
+				1,
+				INT_MAX,
+				PGC_SIGHUP,
+				0,
+				NULL,
+				NULL,
+				NULL);
+	DefineCustomIntVariable("worker_spi.total_workers",
+				"Number of workers.",
+				NULL,
+				&worker_spi_total_workers,
+				2,
+				1,
+				100,
+				PGC_POSTMASTER,
+				0,
+				NULL,
+				NULL,
+				NULL);
+
+	/* set up common data for all our workers */
 	worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
 	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
 	worker.bgw_main = worker_spi_main;
 	worker.bgw_sighup = worker_spi_sighup;
 	worker.bgw_sigterm = worker_spi_sigterm;
 
 	/*
-	 * These values are used for the first worker.
-	 *
-	 * Note these are palloc'd.  The reason this works after starting a new
-	 * worker process is that if we only fork, they point to valid allocated
-	 * memory in the child process; and if we fork and then exec, the exec'd
-	 * process will run this code again, and so the memory is also valid there.
+	 * Now fill in worker-specific data, and do the actual registrations.
 	 */
-	table = palloc(sizeof(worktable));
-	table->schema = pstrdup("schema1");
-	table->name = pstrdup("counted");
+	for (i = 1; i <= worker_spi_total_workers; i++)
+	{
+		sprintf(name, "worker %d", i);
+		worker.bgw_name = pstrdup(name);
 
-	worker.bgw_name = "SPI worker 1";
-	worker.bgw_restart_time = BGW_NEVER_RESTART;
-	worker.bgw_main_arg = (void *) table;
-	RegisterBackgroundWorker(&worker);
-
-	/* Values for the second worker */
-	table = palloc(sizeof(worktable));
-	table->schema = pstrdup("our schema2");
-	table->name = pstrdup("counted rows");
-
-	worker.bgw_name = "SPI worker 2";
-	worker.bgw_restart_time = 2;
-	worker.bgw_main_arg = (void *) table;
-	RegisterBackgroundWorker(&worker);
+		table = palloc(sizeof(worktable));
+		sprintf(name, "schema%d", i);
+		table->schema = pstrdup(name);
+		table->name = pstrdup("counted");
+		worker.bgw_main_arg = (void *) table;
+
+		RegisterBackgroundWorker(&worker);
+	}
 }