From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Mon, 16 Apr 2007 18:30:04 +0000 (+0000)
Subject: Add a multi-worker capability to autovacuum.  This allows multiple worker
X-Git-Tag: REL8_3_BETA1~827
X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=e2a186b03cc1a87cf26644db18f28a20f10bd739;p=postgresql

Add a multi-worker capability to autovacuum.  This allows multiple worker
processes to be running simultaneously.  Also, now autovacuum processes do not
count towards the max_connections limit; they are counted separately from
regular processes, and are limited by the new GUC variable
autovacuum_max_workers.

The launcher now has intelligence to launch workers on each database every
autovacuum_naptime seconds, limited only on the max amount of worker slots
available.

Also, the global worker I/O utilization is limited by the vacuum cost-based
delay feature.  Workers are "balanced" so that the total I/O consumption does
not exceed the established limit.  This part of the patch was contributed by
ITAGAKI Takahiro.

Per discussion.
---

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 57a618faa6..e10d2d753a 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1,4 +1,4 @@
-<!-- $PostgreSQL: pgsql/doc/src/sgml/config.sgml,v 1.119 2007/04/02 15:27:02 petere Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/config.sgml,v 1.120 2007/04/16 18:29:50 alvherre Exp $ -->
 
 <chapter Id="runtime-config">
   <title>Server Configuration</title>
@@ -3166,7 +3166,7 @@ SELECT * FROM parent WHERE key = 2400;
       <listitem>
        <para>
         Controls whether the server should run the
-        autovacuum daemon.  This is off by default.
+        autovacuum launcher daemon.  This is on by default.
         <varname>stats_start_collector</> and <varname>stats_row_level</>
         must also be turned on for autovacuum to work.
         This parameter can only be set in the <filename>postgresql.conf</>
@@ -3175,6 +3175,21 @@ SELECT * FROM parent WHERE key = 2400;
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-autovacuum-max-workers" xreflabel="autovacuum_max_workers">
+      <term><varname>autovacuum_max_workers</varname> (<type>integer</type>)</term>
+      <indexterm>
+       <primary><varname>autovacuum_max_workers</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Specifies the maximum number of autovacuum processes (other than the
+        autovacuum launcher) which may be running at any one time.  The default
+        is three (<literal>3</literal>).  This parameter can only be set in
+        the <filename>postgresql.conf</> file or on the server command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-autovacuum-naptime" xreflabel="autovacuum_naptime">
       <term><varname>autovacuum_naptime</varname> (<type>integer</type>)</term>
       <indexterm>
@@ -3182,9 +3197,9 @@ SELECT * FROM parent WHERE key = 2400;
       </indexterm>
       <listitem>
        <para>
-        Specifies the delay between activity rounds for the autovacuum
-        daemon.  In each round the daemon examines one database
-        and issues <command>VACUUM</> and <command>ANALYZE</> commands
+        Specifies the minimum delay between autovacuum runs on any given
+        database.  In each round the daemon examines the
+        database and issues <command>VACUUM</> and <command>ANALYZE</> commands
         as needed for tables in that database.  The delay is measured
         in seconds, and the default is one minute (<literal>1m</>).
         This parameter can only be set in the <filename>postgresql.conf</>
@@ -3318,7 +3333,10 @@ SELECT * FROM parent WHERE key = 2400;
         Specifies the cost limit value that will be used in automatic
         <command>VACUUM</> operations.  If <literal>-1</> is specified (which is the
         default), the regular
-        <xref linkend="guc-vacuum-cost-limit"> value will be used.
+        <xref linkend="guc-vacuum-cost-limit"> value will be used.  Note that
+        the value is distributed proportionally among the running autovacuum
+        workers, if there is more than one, so that the sum of the limits of
+        each worker never exceeds the limit on this variable.
         This parameter can only be set in the <filename>postgresql.conf</>
         file or on the server command line.
         This setting can be overridden for individual tables by entries in
diff --git a/doc/src/sgml/maintenance.sgml b/doc/src/sgml/maintenance.sgml
index fe5369c19c..2be11332c2 100644
--- a/doc/src/sgml/maintenance.sgml
+++ b/doc/src/sgml/maintenance.sgml
@@ -1,4 +1,4 @@
-<!-- $PostgreSQL: pgsql/doc/src/sgml/maintenance.sgml,v 1.70 2007/02/01 19:10:24 momjian Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/maintenance.sgml,v 1.71 2007/04/16 18:29:50 alvherre Exp $ -->
 
 <chapter id="maintenance">
  <title>Routine Database Maintenance Tasks</title>
@@ -466,26 +466,43 @@ HINT:  Stop the postmaster and use a standalone backend to VACUUM in "mydb".
     <secondary>general information</secondary>
    </indexterm>
    <para>
-    Beginning in <productname>PostgreSQL </productname> 8.1, there is a
-    separate optional server process called the <firstterm>autovacuum
-    daemon</firstterm>, whose purpose is to automate the execution of
+    Beginning in <productname>PostgreSQL</productname> 8.1, there is an
+    optional feature called <firstterm>autovacuum</firstterm>,
+    whose purpose is to automate the execution of
     <command>VACUUM</command> and <command>ANALYZE </command> commands.
-    When enabled, the autovacuum daemon runs periodically and checks for
+    When enabled, autovacuum checks for
     tables that have had a large number of inserted, updated or deleted
     tuples.  These checks use the row-level statistics collection facility;
-    therefore, the autovacuum daemon cannot be used unless <xref
+    therefore, autovacuum cannot be used unless <xref
     linkend="guc-stats-start-collector"> and <xref
-    linkend="guc-stats-row-level"> are set to <literal>true</literal>.  Also,
-    it's important to allow a slot for the autovacuum process when choosing
-    the value of <xref linkend="guc-superuser-reserved-connections">.  In
-    the default configuration, autovacuuming is enabled and the related
+    linkend="guc-stats-row-level"> are set to <literal>true</literal>.
+    In the default configuration, autovacuuming is enabled and the related
     configuration parameters are appropriately set.
    </para>
 
    <para>
-    The autovacuum daemon, when enabled, runs every <xref
-    linkend="guc-autovacuum-naptime"> seconds.  On each run, it selects
-    one database to process and checks each table within that database.
+	Beginning in <productname>PostgreSQL</productname> 8.3, autovacuum has a
+	multi-process architecture: there is a daemon process, called the
+	<firstterm>autovacuum launcher</firstterm>, which is in charge of starting
+	an <firstterm>autovacuum worker</firstterm> process on each database every
+	<xref linkend="guc-autovacuum-naptime"> seconds.
+   </para>
+
+   <para>
+    There is a limit of <xref linkend="guc-autovacuum-max-workers"> worker
+    processes that may be running at at any time, so if the <command>VACUUM</>
+    and <command>ANALYZE</> work to do takes too long to run, the deadline may
+    be failed to meet for other databases.  Also, if a particular database
+    takes long to process, more than one worker may be processing it
+	simultaneously.  The workers are smart enough to avoid repeating work that
+	other workers have done, so this is normally not a problem.  Note that the
+	number of running workers does not count towards the <xref
+	linkend="guc-max-connections"> nor the <xref
+	linkend="guc-superuser-reserved-connections"> limits.
+   </para>
+
+   <para>
+    On each run, the worker process checks each table within that database, and
     <command>VACUUM</command> or <command>ANALYZE</command> commands are
     issued as needed.
    </para>
@@ -581,6 +598,12 @@ analyze threshold = analyze base threshold + analyze scale factor * number of tu
     </para>
    </caution>
 
+   <para>
+    When multiple workers are running, the cost limit is "balanced" among all
+    the running workers, so that the total impact on the system is the same,
+    regardless of the number of workers actually running.
+   </para>
+
   </sect2>
  </sect1>
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index d350420ab2..f275448756 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -13,7 +13,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.349 2007/03/14 18:48:55 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.350 2007/04/16 18:29:50 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -3504,6 +3504,9 @@ vacuum_delay_point(void)
 
 		VacuumCostBalance = 0;
 
+		/* update balance values for workers */
+		AutoVacuumUpdateDelay();
+
 		/* Might have gotten an interrupt while sleeping */
 		CHECK_FOR_INTERRUPTS();
 	}
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 4631c636c0..9893fa680b 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.40 2007/03/28 22:17:12 alvherre Exp $
+ *	  $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.41 2007/04/16 18:29:52 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -52,6 +52,7 @@
 #include "utils/syscache.h"
 
 
+static volatile sig_atomic_t got_SIGUSR1 = false;
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t avlauncher_shutdown_request = false;
 
@@ -59,6 +60,7 @@ static volatile sig_atomic_t avlauncher_shutdown_request = false;
  * GUC parameters
  */
 bool		autovacuum_start_daemon = false;
+int			autovacuum_max_workers;
 int			autovacuum_naptime;
 int			autovacuum_vac_thresh;
 double		autovacuum_vac_scale;
@@ -69,7 +71,7 @@ int			autovacuum_freeze_max_age;
 int			autovacuum_vac_cost_delay;
 int			autovacuum_vac_cost_limit;
 
-/* Flag to tell if we are in the autovacuum daemon process */
+/* Flags to tell if we are in an autovacuum process */
 static bool am_autovacuum_launcher = false;
 static bool am_autovacuum_worker = false;
 
@@ -82,14 +84,22 @@ static int	default_freeze_min_age;
 /* Memory context for long-lived data */
 static MemoryContext AutovacMemCxt;
 
-/* struct to keep list of candidate databases for vacuum */
-typedef struct autovac_dbase
+/* struct to keep track of databases in launcher */
+typedef struct avl_dbase
 {
-	Oid			ad_datid;
-	char	   *ad_name;
-	TransactionId ad_frozenxid;
-	PgStat_StatDBEntry *ad_entry;
-} autovac_dbase;
+	Oid			adl_datid;			/* hash key -- must be first */
+	TimestampTz	adl_next_worker;
+	int			adl_score;
+} avl_dbase;
+
+/* struct to keep track of databases in worker */
+typedef struct avw_dbase
+{
+	Oid			adw_datid;
+	char	   *adw_name;
+	TransactionId adw_frozenxid;
+	PgStat_StatDBEntry *adw_entry;
+} avw_dbase;
 
 /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
 typedef struct av_relation
@@ -110,14 +120,73 @@ typedef struct autovac_table
 	int			at_vacuum_cost_limit;
 } autovac_table;
 
+/*-------------
+ * This struct holds information about a single worker's whereabouts.  We keep
+ * an array of these in shared memory, sized according to
+ * autovacuum_max_workers.
+ *
+ * wi_links		entry into free list or running list
+ * wi_dboid		OID of the database this worker is supposed to work on
+ * wi_tableoid	OID of the table currently being vacuumed
+ * wi_workerpid	PID of the running worker, 0 if not yet started
+ * wi_launchtime Time at which this worker was launched
+ * wi_cost_*	Vacuum cost-based delay parameters current in this worker
+ *
+ * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+ * protected by AutovacuumScheduleLock (which is read-only for everyone except
+ * that worker itself).
+ *-------------
+ */
+typedef struct WorkerInfoData
+{
+	SHM_QUEUE	wi_links;
+	Oid			wi_dboid;
+	Oid			wi_tableoid;
+	int			wi_workerpid;
+	TimestampTz	wi_launchtime;
+	int			wi_cost_delay;
+	int			wi_cost_limit;
+	int			wi_cost_limit_base;
+} WorkerInfoData;
+
+typedef struct WorkerInfoData *WorkerInfo;
+
+/*-------------
+ * The main autovacuum shmem struct.  On shared memory we store this main
+ * struct and the array of WorkerInfo structs.  This struct keeps:
+ *
+ * av_launcherpid	the PID of the autovacuum launcher
+ * av_freeWorkers	the WorkerInfo freelist
+ * av_runningWorkers the WorkerInfo non-free queue
+ * av_startingWorker pointer to WorkerInfo currently being started (cleared by
+ *					the worker itself as soon as it's up and running)
+ * av_rebalance		true when a worker determines that cost limits must be
+ * 					rebalanced
+ *
+ * This struct is protected by AutovacuumLock.
+ *-------------
+ */
 typedef struct
 {
-	Oid		process_db;			/* OID of database to process */
-	int		worker_pid;			/* PID of the worker process, if any */
+	pid_t			av_launcherpid;
+	SHMEM_OFFSET	av_freeWorkers;
+	SHM_QUEUE		av_runningWorkers;
+	SHMEM_OFFSET	av_startingWorker;
+	bool			av_rebalance;
 } AutoVacuumShmemStruct;
 
 static AutoVacuumShmemStruct *AutoVacuumShmem;
 
+/* the database list in the launcher, and the context that contains it */
+static Dllist *DatabaseList = NULL;
+static MemoryContext DatabaseListCxt = NULL;
+
+/* Pointer to my own WorkerInfo, valid on each worker */
+static WorkerInfo	MyWorkerInfo = NULL;
+
+/* PID of launcher, valid only in worker while shutting down */
+int	AutovacuumLauncherPid = 0;
+
 #ifdef EXEC_BACKEND
 static pid_t avlauncher_forkexec(void);
 static pid_t avworker_forkexec(void);
@@ -125,9 +194,16 @@ static pid_t avworker_forkexec(void);
 NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
 NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
 
-static void do_start_worker(void);
+static Oid do_start_worker(void);
+static uint64 launcher_determine_sleep(bool canlaunch, bool recursing);
+static void launch_worker(TimestampTz now);
+static List *get_database_list(void);
+static void rebuild_database_list(Oid newdb);
+static int db_comparator(const void *a, const void *b);
+static void autovac_balance_cost(void);
+
 static void do_autovacuum(void);
-static List *autovac_get_database_list(void);
+static void FreeWorkerInfo(int code, Datum arg);
 
 static void relation_check_autovac(Oid relid, Form_pg_class classForm,
 					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
@@ -147,6 +223,7 @@ static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
 						  PgStat_StatDBEntry *dbentry);
 static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
 static void avl_sighup_handler(SIGNAL_ARGS);
+static void avl_sigusr1_handler(SIGNAL_ARGS);
 static void avlauncher_shutdown(SIGNAL_ARGS);
 static void avl_quickdie(SIGNAL_ARGS);
 
@@ -230,12 +307,34 @@ StartAutoVacLauncher(void)
 
 /*
  * Main loop for the autovacuum launcher process.
+ *
+ * The signalling between launcher and worker is as follows:
+ *
+ * When the worker has finished starting up, it stores its PID in wi_workerpid
+ * and sends a SIGUSR1 signal to the launcher.  The launcher then knows that
+ * the postmaster is ready to start a new worker.  We do it this way because
+ * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+ * yet processed the last one, in which case the second signal would be lost.
+ * This is only useful when two workers need to be started close to one
+ * another, which should be rare but it's possible.
+ *
+ * When a worker exits, it resets the WorkerInfo struct and puts it back into
+ * the free list.  If there is no free worker slot, it will also signal the
+ * launcher, which then wakes up and can launch a new worker if it needs to.
+ * Note that we only need to do it when there's no free worker slot, because
+ * otherwise there is no need -- the launcher would be awakened normally per
+ * schedule.
+ *
+ * There is a potential problem if, for some reason, a worker starts and is not
+ * able to bootstrap itself correctly.  To prevent this situation from starving
+ * the whole system, the launcher checks the launch time of the "starting
+ * worker".  If it's too old (older than autovacuum_naptime seconds), it resets
+ * the worker entry and puts it back into the free list.
  */
 NON_EXEC_STATIC void
 AutoVacLauncherMain(int argc, char *argv[])
 {
 	sigjmp_buf	local_sigjmp_buf;
-	MemoryContext	avlauncher_cxt;
 
 	/* we are a postmaster subprocess now */
 	IsUnderPostmaster = true;
@@ -264,9 +363,6 @@ AutoVacLauncherMain(int argc, char *argv[])
 	 * Set up signal handlers.	Since this is an auxiliary process, it has
 	 * particular signal requirements -- no deadlock checker or sinval
 	 * catchup, for example.
-	 *
-	 * XXX It may be a good idea to receive signals when an avworker process
-	 * finishes.
 	 */
 	pqsignal(SIGHUP, avl_sighup_handler);
 
@@ -276,7 +372,7 @@ AutoVacLauncherMain(int argc, char *argv[])
 	pqsignal(SIGALRM, SIG_IGN);
 
 	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, SIG_IGN);
+	pqsignal(SIGUSR1, avl_sigusr1_handler);
 	/* We don't listen for async notifies */
 	pqsignal(SIGUSR2, SIG_IGN);
 	pqsignal(SIGFPE, FloatExceptionHandler);
@@ -300,12 +396,12 @@ AutoVacLauncherMain(int argc, char *argv[])
 	 * that we can reset the context during error recovery and thereby avoid
 	 * possible memory leaks.
 	 */
-	avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
-										   "Autovacuum Launcher",
-										   ALLOCSET_DEFAULT_MINSIZE,
-										   ALLOCSET_DEFAULT_INITSIZE,
-										   ALLOCSET_DEFAULT_MAXSIZE);
-	MemoryContextSwitchTo(avlauncher_cxt);
+	AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
+										  "Autovacuum Launcher",
+										  ALLOCSET_DEFAULT_MINSIZE,
+										  ALLOCSET_DEFAULT_INITSIZE,
+										  ALLOCSET_DEFAULT_MAXSIZE);
+	MemoryContextSwitchTo(AutovacMemCxt);
 
 
 	/*
@@ -336,11 +432,15 @@ AutoVacLauncherMain(int argc, char *argv[])
 		 * Now return to normal top-level context and clear ErrorContext for
 		 * next time.
 		 */
-		MemoryContextSwitchTo(avlauncher_cxt);
+		MemoryContextSwitchTo(AutovacMemCxt);
 		FlushErrorState();
 
 		/* Flush any leaked data in the top-level context */
-		MemoryContextResetAndDeleteChildren(avlauncher_cxt);
+		MemoryContextResetAndDeleteChildren(AutovacMemCxt);
+
+		/* don't leave dangling pointers to freed memory */
+		DatabaseListCxt = NULL;
+		DatabaseList = NULL;
 
 		/* Make sure pgstat also considers our stat data as gone */
 		pgstat_clear_snapshot();
@@ -361,18 +461,32 @@ AutoVacLauncherMain(int argc, char *argv[])
 	ereport(LOG,
 			(errmsg("autovacuum launcher started")));
 
+	/* must unblock signals before calling rebuild_database_list */
 	PG_SETMASK(&UnBlockSig);
 
+	/* in emergency mode, just start a worker and go away */
+	if (!autovacuum_start_daemon)
+	{
+		do_start_worker();
+		proc_exit(0);		/* done */
+	}
+
+	AutoVacuumShmem->av_launcherpid = MyProcPid;
+
 	/*
-	 * take a nap before executing the first iteration, unless we were
-	 * requested an emergency run.
+	 * Create the initial database list.  The invariant we want this list to
+	 * keep is that it's ordered by decreasing next_time.  As soon as an entry
+	 * is updated to a higher time, it will be moved to the front (which is
+	 * correct because the only operation is to add autovacuum_naptime to the
+	 * entry, and time always increases).
 	 */
-	if (autovacuum_start_daemon)
-		pg_usleep(autovacuum_naptime * 1000000L); 
+	rebuild_database_list(InvalidOid);
 
 	for (;;)
 	{
-		int		worker_pid;
+		uint64		micros;
+		bool	can_launch;
+		TimestampTz current_time = 0;
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -381,6 +495,13 @@ AutoVacLauncherMain(int argc, char *argv[])
 		if (!PostmasterIsAlive(true))
 			exit(1);
 
+		micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
+										  INVALID_OFFSET, false);
+
+		/* Sleep for a while according to schedule */
+		pg_usleep(micros);
+
+		/* the normal shutdown case */
 		if (avlauncher_shutdown_request)
 			break;
 
@@ -388,82 +509,455 @@ AutoVacLauncherMain(int argc, char *argv[])
 		{
 			got_SIGHUP = false;
 			ProcessConfigFile(PGC_SIGHUP);
+
+			/* rebalance in case the default cost parameters changed */
+			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+			autovac_balance_cost();
+			LWLockRelease(AutovacuumLock);
+
+			/* rebuild the list in case the naptime changed */
+			rebuild_database_list(InvalidOid);
+		}
+
+		/* a worker started up or finished */
+		if (got_SIGUSR1)
+		{
+			got_SIGUSR1 = false;
+
+			/* rebalance cost limits, if needed */
+			if (AutoVacuumShmem->av_rebalance)
+			{
+				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+				AutoVacuumShmem->av_rebalance = false;
+				autovac_balance_cost();
+				LWLockRelease(AutovacuumLock);
+			}
 		}
 
 		/*
-		 * if there's a worker already running, sleep until it
-		 * disappears.
+		 * There are some conditions that we need to check before trying to
+		 * start a launcher.  First, we need to make sure that there is a
+		 * launcher slot available.  Second, we need to make sure that no other
+		 * worker is still starting up.
 		 */
+
 		LWLockAcquire(AutovacuumLock, LW_SHARED);
-		worker_pid = AutoVacuumShmem->worker_pid;
-		LWLockRelease(AutovacuumLock);
 
-		if (worker_pid != 0)
-		{
-			PGPROC *proc = BackendPidGetProc(worker_pid);
+		can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
 
-			if (proc != NULL && proc->isAutovacuum)
-				goto sleep;
+		if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
+		{
+			long	secs;
+			int		usecs;
+			WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
+
+			if (current_time == 0)
+				current_time = GetCurrentTimestamp();
+
+			/*
+			 * We can't launch another worker when another one is still
+			 * starting up, so just sleep for a bit more; that worker will wake
+			 * us up again as soon as it's ready.  We will only wait
+			 * autovacuum_naptime seconds for this to happen however.  Note
+			 * that failure to connect to a particular database is not a
+			 * problem here, because the worker removes itself from the
+			 * startingWorker pointer before trying to connect; only low-level
+			 * problems, like fork() failure, can get us here.
+			 */
+			TimestampDifference(worker->wi_launchtime, current_time,
+								&secs, &usecs);
+
+			/* ignore microseconds, as they cannot make any difference */
+			if (secs > autovacuum_naptime)
+			{
+				LWLockRelease(AutovacuumLock);
+				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+				/*
+				 * No other process can put a worker in starting mode, so if
+				 * startingWorker is still INVALID after exchanging our lock,
+				 * we assume it's the same one we saw above (so we don't
+				 * recheck the launch time).
+				 */
+				if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
+				{
+					worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
+					worker->wi_dboid = InvalidOid;
+					worker->wi_tableoid = InvalidOid;
+					worker->wi_workerpid = 0;
+					worker->wi_launchtime = 0;
+					worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
+					AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
+					AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
+				}
+			}
 			else
 			{
 				/*
-				 * if the worker is not really running (or it's a process
-				 * that's not an autovacuum worker), remove the PID from shmem.
-				 * This should not happen, because either the worker exits
-				 * cleanly, in which case it'll remove the PID, or it dies, in
-				 * which case postmaster will cause a system reset cycle.
+				 * maybe the postmaster neglected this start signal --
+				 * resend it.  Note: the constraints in
+				 * launcher_determine_sleep keep us from delivering signals too
+				 * quickly (at most once every 100ms).
 				 */
-				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
-				worker_pid = 0;
-				LWLockRelease(AutovacuumLock);
+				SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+				can_launch = false;
 			}
 		}
+		LWLockRelease(AutovacuumLock);		/* either shared or exclusive */
 
-		do_start_worker();
+		if (can_launch)
+		{
+			Dlelem	   *elem;
 
-sleep:
-		/*
-		 * in emergency mode, exit immediately so that the postmaster can
-		 * request another run right away if needed.
-		 *
-		 * XXX -- maybe it would be better to handle this inside the launcher
-		 * itself.
-		 */
-		if (!autovacuum_start_daemon)
-			break;
+			elem = DLGetTail(DatabaseList);
 
-		/* have pgstat read the file again next time */
-		pgstat_clear_snapshot();
+			if (current_time == 0)
+				current_time = GetCurrentTimestamp();
+
+			if (elem != NULL)
+			{
+				avl_dbase *avdb = DLE_VAL(elem);
+				long	secs;
+				int		usecs;
 
-		/* now sleep until the next autovac iteration */
-		pg_usleep(autovacuum_naptime * 1000000L); 
+				TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
+
+				/* do we have to start a worker? */
+				if (secs <= 0 && usecs <= 0)
+					launch_worker(current_time);
+			}
+			else
+			{
+				/*
+				 * Special case when the list is empty: start a worker right
+				 * away.  This covers the initial case, when no database is in
+				 * pgstats (thus the list is empty).  Note that the constraints
+				 * in launcher_determine_sleep keep us from starting workers
+				 * too quickly (at most once every autovacuum_naptime when the
+				 * list is empty).
+				 */
+				launch_worker(current_time);
+			}
+		}
 	}
 
 	/* Normal exit from the autovac launcher is here */
 	ereport(LOG,
 			(errmsg("autovacuum launcher shutting down")));
+	AutoVacuumShmem->av_launcherpid = 0;
 
 	proc_exit(0);		/* done */
 }
 
+/*
+ * Determine the time to sleep, in microseconds, based on the database list.
+ *
+ * The "canlaunch" parameter indicates whether we can start a worker right now,
+ * for example due to the workers being all busy.
+ */
+static uint64
+launcher_determine_sleep(bool canlaunch, bool recursing)
+{
+	long	secs;
+	int		usecs;
+	Dlelem *elem;
+
+	/*
+	 * We sleep until the next scheduled vacuum.  We trust that when the
+	 * database list was built, care was taken so that no entries have times in
+	 * the past; if the first entry has too close a next_worker value, or a
+	 * time in the past, we will sleep a small nominal time.
+	 */
+	if (!canlaunch)
+	{
+		secs = autovacuum_naptime;
+		usecs = 0;
+	}
+	else if ((elem = DLGetTail(DatabaseList)) != NULL)
+	{
+		avl_dbase  *avdb = DLE_VAL(elem);
+		TimestampTz	current_time = GetCurrentTimestamp();
+		TimestampTz	next_wakeup;
+
+		next_wakeup = avdb->adl_next_worker;
+		TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+	}
+	else
+	{
+		/* list is empty, sleep for whole autovacuum_naptime seconds  */
+		secs = autovacuum_naptime;
+		usecs = 0;
+	}
+
+	/*
+	 * If the result is exactly zero, it means a database had an entry with
+	 * time in the past.  Rebuild the list so that the databases are evenly
+	 * distributed again, and recalculate the time to sleep.  This can happen
+	 * if there are more tables needing vacuum than workers, and they all take
+	 * longer to vacuum than autovacuum_naptime.
+	 *
+	 * We only recurse once.  rebuild_database_list should always return times
+	 * in the future, but it seems best not to trust too much on that.
+	 */
+	if (secs == 0L && usecs == 0 && !recursing)
+	{
+		rebuild_database_list(InvalidOid);
+		return launcher_determine_sleep(canlaunch, true);
+	}
+
+	/* 100ms is the smallest time we'll allow the launcher to sleep */
+	if (secs <= 0L && usecs <= 100000)
+	{
+		secs = 0L;
+		usecs = 100000;	/* 100 ms */
+	}
+
+	return secs * 1000000 + usecs;
+}
+
+/*
+ * Build an updated DatabaseList.  It must only contain databases that appear
+ * in pgstats, and must be sorted by next_worker from highest to lowest,
+ * distributed regularly across the next autovacuum_naptime interval.
+ *
+ * Receives the Oid of the database that made this list be generated (we call
+ * this the "new" database, because when the database was already present on
+ * the list, we expect that this function is not called at all).  The
+ * preexisting list, if any, will be used to preserve the order of the
+ * databases in the autovacuum_naptime period.  The new database is put at the
+ * end of the interval.  The actual values are not saved, which should not be
+ * much of a problem.
+ */
+static void
+rebuild_database_list(Oid newdb)
+{
+	List	   *dblist;
+	ListCell   *cell;
+	MemoryContext newcxt;
+	MemoryContext oldcxt;
+	MemoryContext tmpcxt;
+	HASHCTL		hctl;
+	int			score;
+	int			nelems;
+	HTAB	   *dbhash;
+
+	/* use fresh stats */
+	pgstat_clear_snapshot();
+
+	newcxt = AllocSetContextCreate(AutovacMemCxt,
+								   "AV dblist",
+								   ALLOCSET_DEFAULT_MINSIZE,
+								   ALLOCSET_DEFAULT_INITSIZE,
+								   ALLOCSET_DEFAULT_MAXSIZE);
+	tmpcxt = AllocSetContextCreate(newcxt,
+								   "tmp AV dblist",
+								   ALLOCSET_DEFAULT_MINSIZE,
+								   ALLOCSET_DEFAULT_INITSIZE,
+								   ALLOCSET_DEFAULT_MAXSIZE);
+	oldcxt = MemoryContextSwitchTo(tmpcxt);
+
+	/*
+	 * Implementing this is not as simple as it sounds, because we need to put
+	 * the new database at the end of the list; next the databases that were
+	 * already on the list, and finally (at the tail of the list) all the other
+	 * databases that are not on the existing list.
+	 *
+	 * To do this, we build an empty hash table of scored databases.  We will
+	 * start with the lowest score (zero) for the new database, then increasing
+	 * scores for the databases in the existing list, in order, and lastly
+	 * increasing scores for all databases gotten via get_database_list() that
+	 * are not already on the hash.
+	 *
+	 * Then we will put all the hash elements into an array, sort the array by
+	 * score, and finally put the array elements into the new doubly linked
+	 * list.
+	 */
+	hctl.keysize = sizeof(Oid);
+	hctl.entrysize = sizeof(avl_dbase);
+	hctl.hash = oid_hash;
+	hctl.hcxt = tmpcxt;
+	dbhash = hash_create("db hash", 20, &hctl,	/* magic number here FIXME */
+						 HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+	/* start by inserting the new database */
+	score = 0;
+	if (OidIsValid(newdb))
+	{
+		avl_dbase	*db;
+		PgStat_StatDBEntry *entry;
+
+		/* only consider this database if it has a pgstat entry */
+		entry = pgstat_fetch_stat_dbentry(newdb);
+		if (entry != NULL)
+		{
+			/* we assume it isn't found because the hash was just created */
+			db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
+
+			/* hash_search already filled in the key */
+			db->adl_score = score++;
+			/* next_worker is filled in later */
+		}
+	}
+
+	/* Now insert the databases from the existing list */
+	if (DatabaseList != NULL)
+	{
+		Dlelem	*elem;
+
+		elem = DLGetHead(DatabaseList);
+		while (elem != NULL)
+		{
+			avl_dbase  *avdb = DLE_VAL(elem);
+			avl_dbase  *db;
+			bool		found;
+			PgStat_StatDBEntry *entry;
+
+			elem = DLGetSucc(elem);
+
+			/*
+			 * skip databases with no stat entries -- in particular, this
+			 * gets rid of dropped databases
+			 */
+			entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+			if (entry == NULL)
+				continue;
+
+			db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
+
+			if (!found)
+			{
+				/* hash_search already filled in the key */
+				db->adl_score = score++;
+				/* next_worker is filled in later */
+			}
+		}
+	}
+
+	/* finally, insert all qualifying databases not previously inserted */
+	dblist = get_database_list();
+	foreach(cell, dblist)
+	{
+		avw_dbase  *avdb = lfirst(cell);
+		avl_dbase  *db;
+		bool		found;
+		PgStat_StatDBEntry *entry;
+
+		/* only consider databases with a pgstat entry */
+		entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+		if (entry == NULL)
+			continue;
+
+		db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
+		/* only update the score if the database was not already on the hash */
+		if (!found)
+		{
+			/* hash_search already filled in the key */
+			db->adl_score = score++;
+			/* next_worker is filled in later */
+		}
+	}
+	nelems = score;
+
+	/* from here on, the allocated memory belongs to the new list */
+	MemoryContextSwitchTo(newcxt);
+	DatabaseList = DLNewList();
+
+	if (nelems > 0)
+	{
+		TimestampTz		current_time;
+		int				millis_increment;
+		avl_dbase	   *dbary;
+		avl_dbase	   *db;
+		HASH_SEQ_STATUS	seq;
+		int				i;
+
+		/* put all the hash elements into an array */
+		dbary = palloc(nelems * sizeof(avl_dbase));
+
+		i = 0;
+		hash_seq_init(&seq, dbhash);
+		while ((db = hash_seq_search(&seq)) != NULL)
+			memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
+
+		/* sort the array */
+		qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
+
+		/* this is the time interval between databases in the schedule */
+		millis_increment = 1000.0 * autovacuum_naptime / nelems;
+		current_time = GetCurrentTimestamp();
+
+		/*
+		 * move the elements from the array into the dllist, setting the 
+		 * next_worker while walking the array
+		 */
+		for (i = 0; i < nelems; i++)
+		{
+			avl_dbase  *db = &(dbary[i]);
+			Dlelem	   *elem;
+
+			current_time = TimestampTzPlusMilliseconds(current_time,
+													   millis_increment);
+			db->adl_next_worker = current_time;
+
+			elem = DLNewElem(db);
+			/* later elements should go closer to the head of the list */
+			DLAddHead(DatabaseList, elem);
+		}
+	}
+
+	/* all done, clean up memory */
+	if (DatabaseListCxt != NULL)
+		MemoryContextDelete(DatabaseListCxt);
+	MemoryContextDelete(tmpcxt);
+	DatabaseListCxt = newcxt;
+	MemoryContextSwitchTo(oldcxt);
+}
+
+/* qsort comparator for avl_dbase, using adl_score */
+static int
+db_comparator(const void *a, const void *b)
+{
+	if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
+		return 0;
+	else
+		return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
+}
+
 /*
  * do_start_worker
  *
  * Bare-bones procedure for starting an autovacuum worker from the launcher.
  * It determines what database to work on, sets up shared memory stuff and
- * signals postmaster to start the worker.
+ * signals postmaster to start the worker.  It fails gracefully if invoked when
+ * autovacuum_workers are already active.
+ *
+ * Return value is the OID of the database that the worker is going to process,
+ * or InvalidOid if no worker was actually started.
  */
-static void
+static Oid
 do_start_worker(void)
 {
 	List	   *dblist;
-	bool		for_xid_wrap;
-	autovac_dbase *db;
-	ListCell *cell;
+	ListCell   *cell;
 	TransactionId xidForceLimit;
+	bool		for_xid_wrap;
+	avw_dbase  *avdb;
+	TimestampTz	current_time;
+	bool		skipit = false;
+
+	/* return quickly when there are no free workers */
+	LWLockAcquire(AutovacuumLock, LW_SHARED);
+	if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
+	{
+		LWLockRelease(AutovacuumLock);
+		return InvalidOid;
+	}
+	LWLockRelease(AutovacuumLock);
+
+	/* use fresh stats */
+	pgstat_clear_snapshot();
 
 	/* Get a list of databases */
-	dblist = autovac_get_database_list();
+	dblist = get_database_list();
 
 	/*
 	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
@@ -495,21 +989,23 @@ do_start_worker(void)
 	 * isn't clear how to construct a metric that measures that and not cause
 	 * starvation for less busy databases.
 	 */
-	db = NULL;
+	avdb = NULL;
 	for_xid_wrap = false;
+	current_time = GetCurrentTimestamp();
 	foreach(cell, dblist)
 	{
-		autovac_dbase *tmp = lfirst(cell);
+		avw_dbase  *tmp = lfirst(cell);
+		Dlelem	   *elem;
 
 		/* Find pgstat entry if any */
-		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
+		tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
 
 		/* Check to see if this one is at risk of wraparound */
-		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
+		if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
 		{
-			if (db == NULL ||
-				TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
-				db = tmp;
+			if (avdb == NULL ||
+				TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
+				avdb = tmp;
 			for_xid_wrap = true;
 			continue;
 		}
@@ -520,26 +1016,156 @@ do_start_worker(void)
 		 * Otherwise, skip a database with no pgstat entry; it means it
 		 * hasn't seen any activity.
 		 */
-		if (!tmp->ad_entry)
+		if (!tmp->adw_entry)
+			continue;
+
+		/*
+		 * Also, skip a database that appears on the database list as having
+		 * been processed recently (less than autovacuum_naptime seconds ago).
+		 * We do this so that we don't select a database which we just
+		 * selected, but that pgstat hasn't gotten around to updating the last
+		 * autovacuum time yet.
+		 */
+		skipit = false;
+		elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
+
+		while (elem != NULL)
+		{
+			avl_dbase *dbp = DLE_VAL(elem);
+
+			if (dbp->adl_datid == tmp->adw_datid)
+			{
+				TimestampTz		curr_plus_naptime;
+				TimestampTz		next = dbp->adl_next_worker;
+				
+				curr_plus_naptime =
+					TimestampTzPlusMilliseconds(current_time,
+												autovacuum_naptime * 1000);
+
+				/*
+				 * What we want here if to skip if next_worker falls between
+				 * the current time and the current time plus naptime.
+				 */
+				if (timestamp_cmp_internal(current_time, next) > 0)
+					skipit = false;
+				else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
+					skipit = false;
+				else
+					skipit = true;
+
+				break;
+			}
+			elem = DLGetPred(elem);
+		}
+		if (skipit)
 			continue;
 
 		/*
 		 * Remember the db with oldest autovac time.  (If we are here,
 		 * both tmp->entry and db->entry must be non-null.)
 		 */
-		if (db == NULL ||
-			tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
-			db = tmp;
+		if (avdb == NULL ||
+			tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
+			avdb = tmp;
 	}
 
 	/* Found a database -- process it */
-	if (db != NULL)
+	if (avdb != NULL)
 	{
+		WorkerInfo	worker;
+		SHMEM_OFFSET sworker;
+
 		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
-		AutoVacuumShmem->process_db = db->ad_datid;
+
+		/*
+		 * Get a worker entry from the freelist.  We checked above, so there
+		 * really should be a free slot -- complain very loudly if there isn't.
+		 */
+		sworker = AutoVacuumShmem->av_freeWorkers;
+		if (sworker == INVALID_OFFSET)
+			elog(FATAL, "no free worker found");
+
+		worker = (WorkerInfo) MAKE_PTR(sworker);
+		AutoVacuumShmem->av_freeWorkers = worker->wi_links.next;
+
+		worker->wi_dboid = avdb->adw_datid;
+		worker->wi_workerpid = 0;
+		worker->wi_launchtime = GetCurrentTimestamp();
+
+		AutoVacuumShmem->av_startingWorker = sworker;
+
 		LWLockRelease(AutovacuumLock);
 
 		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+
+		return avdb->adw_datid;
+	}
+	else if (skipit)
+	{
+		/*
+		 * If we skipped all databases on the list, rebuild it, because it
+		 * probably contains a dropped database.
+		 */
+		rebuild_database_list(InvalidOid);
+	}
+
+	return InvalidOid;
+}
+
+/*
+ * launch_worker
+ *
+ * Wrapper for starting a worker from the launcher.  Besides actually starting
+ * it, update the database list to reflect the next time that another one will
+ * need to be started on the selected database.  The actual database choice is
+ * left to do_start_worker.
+ *
+ * This routine is also expected to insert an entry into the database list if
+ * the selected database was previously absent from the list.  It returns the
+ * new database list.
+ */
+static void
+launch_worker(TimestampTz now)
+{
+	Oid		dbid;
+	Dlelem *elem;
+
+	dbid = do_start_worker();
+	if (OidIsValid(dbid))
+	{
+		/*
+		 * Walk the database list and update the corresponding entry.  If the
+		 * database is not on the list, we'll recreate the list.
+		 */
+		elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList);
+		while (elem != NULL)
+		{
+			avl_dbase *avdb = DLE_VAL(elem);
+
+			if (avdb->adl_datid == dbid)
+			{
+				/*
+				 * add autovacuum_naptime seconds to the current time, and use
+				 * that as the new "next_worker" field for this database.
+				 */
+				avdb->adl_next_worker =
+					TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+
+				DLMoveToFront(elem);
+				break;
+			}
+			elem = DLGetSucc(elem);
+		}
+
+		/*
+		 * If the database was not present in the database list, we rebuild the
+		 * list.  It's possible that the database does not get into the list
+		 * anyway, for example if it's a database that doesn't have a pgstat
+		 * entry, but this is not a problem because we don't want to schedule
+		 * workers regularly into those in any case.
+		 */
+		if (elem == NULL)
+			rebuild_database_list(dbid);
 	}
 }
 
@@ -550,6 +1176,13 @@ avl_sighup_handler(SIGNAL_ARGS)
 	got_SIGHUP = true;
 }
 
+/* SIGUSR1: a worker is up and running, or just finished */
+static void
+avl_sigusr1_handler(SIGNAL_ARGS)
+{
+	got_SIGUSR1 = true;
+}
+
 static void
 avlauncher_shutdown(SIGNAL_ARGS)
 {
@@ -665,7 +1298,7 @@ NON_EXEC_STATIC void
 AutoVacWorkerMain(int argc, char *argv[])
 {
 	sigjmp_buf	local_sigjmp_buf;
-	Oid			dbid;
+	Oid			dbid = InvalidOid;
 
 	/* we are a postmaster subprocess now */
 	IsUnderPostmaster = true;
@@ -763,18 +1396,35 @@ AutoVacWorkerMain(int argc, char *argv[])
 	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
 
 	/*
-	 * Get the database Id we're going to work on, and announce our PID
-	 * in the shared memory area.  We remove the database OID immediately
-	 * from the shared memory area.
+	 * Force statement_timeout to zero to avoid a timeout setting from
+	 * preventing regular maintenance from being executed.
 	 */
-	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+	SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
 
-	dbid = AutoVacuumShmem->process_db;
-	AutoVacuumShmem->process_db = InvalidOid;
-	AutoVacuumShmem->worker_pid = MyProcPid;
+	/*
+	 * Get the info about the database we're going to work on.
+	 */
+	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+	MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
+	dbid = MyWorkerInfo->wi_dboid;
+	MyWorkerInfo->wi_workerpid = MyProcPid;
 
+	/* insert into the running list */
+	SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers, 
+						 &MyWorkerInfo->wi_links);
+	/*
+	 * remove from the "starting" pointer, so that the launcher can start a new
+	 * worker if required
+	 */
+	AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
 	LWLockRelease(AutovacuumLock);
 
+	on_shmem_exit(FreeWorkerInfo, 0);
+
+	/* wake up the launcher */
+	if (AutoVacuumShmem->av_launcherpid != 0)
+		kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
+
 	if (OidIsValid(dbid))
 	{
 		char	*dbname;
@@ -803,7 +1453,7 @@ AutoVacWorkerMain(int argc, char *argv[])
 
 		/* Create the memory context where cross-transaction state is stored */
 		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
-											  "Autovacuum context",
+											  "AV worker",
 											  ALLOCSET_DEFAULT_MINSIZE,
 											  ALLOCSET_DEFAULT_INITSIZE,
 											  ALLOCSET_DEFAULT_MAXSIZE);
@@ -814,25 +1464,152 @@ AutoVacWorkerMain(int argc, char *argv[])
 	}
 
 	/*
-	 * Now remove our PID from shared memory, so that the launcher can start
-	 * another worker as soon as appropriate.
+	 * FIXME -- we need to notify the launcher when we are gone.  But this
+	 * should be done after our PGPROC is released, in ProcKill.
 	 */
-	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
-	AutoVacuumShmem->worker_pid = 0;
-	LWLockRelease(AutovacuumLock);
 
 	/* All done, go away */
 	proc_exit(0);
 }
 
 /*
- * autovac_get_database_list
+ * Return a WorkerInfo to the free list */
+static void
+FreeWorkerInfo(int code, Datum arg)
+{
+	if (MyWorkerInfo != NULL)
+	{
+		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+		/*
+		 * If this worker shuts down when there is no free worker slot, wake
+		 * the launcher up so that he can launch a new worker immediately if
+		 * required.  We only save the launcher's PID in local memory here --
+		 * the actual signal will be sent when the PGPROC is recycled, because
+		 * that is when the new worker can actually be launched.
+		 *
+		 * We somewhat ignore the risk that the launcher changes its PID
+		 * between we reading it and the actual kill; we expect ProcKill to be
+		 * called shortly after us, and we assume that PIDs are not reused too
+		 * quickly after a process exits.
+		 */
+		if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
+			AutovacuumLauncherPid = AutoVacuumShmem->av_launcherpid;
+
+		SHMQueueDelete(&MyWorkerInfo->wi_links);
+		MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers;
+		MyWorkerInfo->wi_dboid = InvalidOid;
+		MyWorkerInfo->wi_tableoid = InvalidOid;
+		MyWorkerInfo->wi_workerpid = 0;
+		MyWorkerInfo->wi_launchtime = 0;
+		MyWorkerInfo->wi_cost_delay = 0;
+		MyWorkerInfo->wi_cost_limit = 0;
+		MyWorkerInfo->wi_cost_limit_base = 0;
+		AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo);
+		/* not mine anymore */
+		MyWorkerInfo = NULL;
+
+		/*
+		 * now that we're inactive, cause a rebalancing of the surviving
+		 * workers
+		 */
+		AutoVacuumShmem->av_rebalance = true;
+		LWLockRelease(AutovacuumLock);
+	}
+}
+
+/*
+ * Update the cost-based delay parameters, so that multiple workers consume
+ * each a fraction of the total available I/O.
+ */
+void
+AutoVacuumUpdateDelay(void)
+{
+	if (MyWorkerInfo)
+	{
+		VacuumCostDelay = MyWorkerInfo->wi_cost_delay;
+		VacuumCostLimit = MyWorkerInfo->wi_cost_limit;
+	}
+}
+
+/*
+ * autovac_balance_cost
+ *		Recalculate the cost limit setting for each active workers.
+ *
+ * Caller must hold the AutovacuumLock in exclusive mode.
+ */
+static void
+autovac_balance_cost(void)
+{
+	WorkerInfo	worker;
+	int         vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ?
+								  autovacuum_vac_cost_limit : VacuumCostLimit);
+	int         vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
+								  autovacuum_vac_cost_delay : VacuumCostDelay);
+	double      cost_total;
+	double      cost_avail;
+
+	/* not set? nothing to do */
+	if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
+		return;
+
+	/* caculate the total base cost limit of active workers */
+	cost_total = 0.0;
+	worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+									   &AutoVacuumShmem->av_runningWorkers,
+									   offsetof(WorkerInfoData, wi_links));
+	while (worker)
+	{
+		if (worker->wi_workerpid != 0 &&
+			worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
+			cost_total +=
+				(double) worker->wi_cost_limit_base / worker->wi_cost_delay;
+
+		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+										   &worker->wi_links,
+										   offsetof(WorkerInfoData, wi_links));
+	}
+	/* there are no cost limits -- nothing to do */
+	if (cost_total <= 0)
+		return;
+
+	/*
+	 * Adjust each cost limit of active workers to balance the total of
+	 * cost limit to autovacuum_vacuum_cost_limit.
+	 */
+	cost_avail = (double) vac_cost_limit / vac_cost_delay;
+	worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+									   &AutoVacuumShmem->av_runningWorkers,
+									   offsetof(WorkerInfoData, wi_links));
+	while (worker)
+	{
+		if (worker->wi_workerpid != 0 &&
+			worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
+		{
+			int     limit = (int)
+				(cost_avail * worker->wi_cost_limit_base / cost_total);
+
+			worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base);
+
+			elog(DEBUG2, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
+				 worker->wi_workerpid, worker->wi_dboid,
+				 worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
+		}
+
+		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+										   &worker->wi_links,
+										   offsetof(WorkerInfoData, wi_links));
+	}
+}
+
+/*
+ * get_database_list
  *
  *		Return a list of all databases.  Note we cannot use pg_database,
  *		because we aren't connected; we use the flat database file.
  */
 static List *
-autovac_get_database_list(void)
+get_database_list(void)
 {
 	char	   *filename;
 	List	   *dblist = NIL;
@@ -852,15 +1629,15 @@ autovac_get_database_list(void)
 	while (read_pg_database_line(db_file, thisname, &db_id,
 								 &db_tablespace, &db_frozenxid))
 	{
-		autovac_dbase *avdb;
+		avw_dbase *avdb;
 
-		avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
+		avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
 
-		avdb->ad_datid = db_id;
-		avdb->ad_name = pstrdup(thisname);
-		avdb->ad_frozenxid = db_frozenxid;
+		avdb->adw_datid = db_id;
+		avdb->adw_name = pstrdup(thisname);
+		avdb->adw_frozenxid = db_frozenxid;
 		/* this gets set later: */
-		avdb->ad_entry = NULL;
+		avdb->adw_entry = NULL;
 
 		dblist = lappend(dblist, avdb);
 	}
@@ -1008,12 +1785,12 @@ do_autovacuum(void)
 	 * Add to the list of tables to vacuum, the OIDs of the tables that
 	 * correspond to the saved OIDs of toast tables needing vacuum.
 	 */
-	foreach (cell, toast_oids)
+	foreach(cell, toast_oids)
 	{
 		Oid		toastoid = lfirst_oid(cell);
 		ListCell *cell2;
 
-		foreach (cell2, table_toast_list)
+		foreach(cell2, table_toast_list)
 		{
 			av_relation	   *ar = lfirst(cell2);
 
@@ -1038,9 +1815,55 @@ do_autovacuum(void)
 		Oid		relid = lfirst_oid(cell);
 		autovac_table *tab;
 		char   *relname;
+		WorkerInfo	worker;
+		bool        skipit;
 
 		CHECK_FOR_INTERRUPTS();
 
+		/*
+		 * hold schedule lock from here until we're sure that this table
+		 * still needs vacuuming.  We also need the AutovacuumLock to walk
+		 * the worker array, but we'll let go of that one quickly.
+		 */
+		LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+		LWLockAcquire(AutovacuumLock, LW_SHARED);
+
+		/*
+		 * Check whether the table is being vacuumed concurrently by another
+		 * worker.
+		 */
+		skipit = false;
+		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+										   &AutoVacuumShmem->av_runningWorkers,
+										   offsetof(WorkerInfoData, wi_links));
+		while (worker)
+		{
+			/* ignore myself */
+			if (worker == MyWorkerInfo)
+				goto next_worker;
+
+			/* ignore workers in other databases */
+			if (worker->wi_dboid != MyDatabaseId)
+				goto next_worker;
+
+			if (worker->wi_tableoid == relid)
+			{
+				skipit = true;
+				break;
+			}
+
+next_worker:
+			worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+											   &worker->wi_links,
+											   offsetof(WorkerInfoData, wi_links));
+		}
+		LWLockRelease(AutovacuumLock);
+		if (skipit)
+		{
+			LWLockRelease(AutovacuumScheduleLock);
+			continue;
+		}
+
 		/*
 		 * Check whether pgstat data still says we need to vacuum this table.
 		 * It could have changed if something else processed the table while we
@@ -1053,11 +1876,18 @@ do_autovacuum(void)
 		if (tab == NULL)
 		{
 			/* someone else vacuumed the table */
+			LWLockRelease(AutovacuumScheduleLock);
 			continue;
 		}
-		/* Ok, good to go! */
 
-		/* Set the vacuum cost parameters for this table */
+		/*
+		 * Ok, good to go.  Store the table in shared memory before releasing
+		 * the lock so that other workers don't vacuum it concurrently.
+		 */
+		MyWorkerInfo->wi_tableoid = relid;
+		LWLockRelease(AutovacuumScheduleLock);
+
+		/* Set the initial vacuum cost parameters for this table */
 		VacuumCostDelay = tab->at_vacuum_cost_delay;
 		VacuumCostLimit = tab->at_vacuum_cost_limit;
 
@@ -1067,6 +1897,18 @@ do_autovacuum(void)
 			 (tab->at_doanalyze ? " ANALYZE" : ""),
 			 relname);
 
+		/*
+		 * Advertise my cost delay parameters for the balancing algorithm, and
+		 * do a balance
+		 */
+		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+		MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay;
+		MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit;
+		MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit;
+		autovac_balance_cost();
+		LWLockRelease(AutovacuumLock);
+
+		/* have at it */
 		autovacuum_do_vac_analyze(tab->at_relid,
 								  tab->at_dovacuum,
 								  tab->at_doanalyze,
@@ -1211,7 +2053,7 @@ table_recheck_autovac(Oid relid)
 	PgStat_StatDBEntry *shared;
 	PgStat_StatDBEntry *dbentry;
 
-	/* We need fresh pgstat data for this */
+	/* use fresh stats */
 	pgstat_clear_snapshot();
 
 	shared = pgstat_fetch_stat_dbentry(InvalidOid);
@@ -1219,8 +2061,8 @@ table_recheck_autovac(Oid relid)
 
 	/* fetch the relation's relcache entry */
 	classTup = SearchSysCacheCopy(RELOID,
-							  ObjectIdGetDatum(relid),
-							  0, 0, 0);
+								  ObjectIdGetDatum(relid),
+								  0, 0, 0);
 	if (!HeapTupleIsValid(classTup))
 		return NULL;
 	classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -1630,7 +2472,16 @@ IsAutoVacuumWorkerProcess(void)
 Size
 AutoVacuumShmemSize(void)
 {
-	return sizeof(AutoVacuumShmemStruct);
+	Size	size;
+
+	/*
+	 * Need the fixed struct and the array of WorkerInfoData.
+	 */
+	size = sizeof(AutoVacuumShmemStruct);
+	size = MAXALIGN(size);
+	size = add_size(size, mul_size(autovacuum_max_workers,
+								   sizeof(WorkerInfoData)));
+	return size;
 }
 
 /*
@@ -1650,8 +2501,29 @@ AutoVacuumShmemInit(void)
 		ereport(FATAL,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("not enough shared memory for autovacuum")));
-	if (found)
-		return;                 /* already initialized */
 
-	MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
+	if (!IsUnderPostmaster)
+	{
+		WorkerInfo	worker;
+		int			i;
+
+		Assert(!found);
+
+		AutoVacuumShmem->av_launcherpid = 0;
+		AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET;
+		SHMQueueInit(&AutoVacuumShmem->av_runningWorkers);
+		AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
+
+		worker = (WorkerInfo) ((char *) AutoVacuumShmem +
+							   MAXALIGN(sizeof(AutoVacuumShmemStruct)));
+
+		/* initialize the WorkerInfo free list */
+		for (i = 0; i < autovacuum_max_workers; i++)
+		{
+			worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers;
+			AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]);
+		}
+	}
+	else
+		Assert(found);
 }
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 2691a50582..e2b8f22d35 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.187 2007/04/03 16:34:36 tgl Exp $
+ *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.188 2007/04/16 18:29:53 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -96,7 +96,7 @@ ProcGlobalShmemSize(void)
 	size = add_size(size, sizeof(PROC_HDR));
 	/* AuxiliaryProcs */
 	size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
-	/* MyProcs */
+	/* MyProcs, including autovacuum */
 	size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
 	/* ProcStructLock */
 	size = add_size(size, sizeof(slock_t));
@@ -110,7 +110,10 @@ ProcGlobalShmemSize(void)
 int
 ProcGlobalSemas(void)
 {
-	/* We need a sema per backend, plus one for each auxiliary process. */
+	/*
+	 * We need a sema per backend (including autovacuum), plus one for each
+	 * auxiliary process.
+	 */
 	return MaxBackends + NUM_AUXILIARY_PROCS;
 }
 
@@ -127,8 +130,8 @@ ProcGlobalSemas(void)
  *	  running out when trying to start another backend is a common failure.
  *	  So, now we grab enough semaphores to support the desired max number
  *	  of backends immediately at initialization --- if the sysadmin has set
- *	  MaxBackends higher than his kernel will support, he'll find out sooner
- *	  rather than later.
+ *	  MaxConnections or autovacuum_max_workers higher than his kernel will
+ *	  support, he'll find out sooner rather than later.
  *
  *	  Another reason for creating semaphores here is that the semaphore
  *	  implementation typically requires us to create semaphores in the
@@ -163,25 +166,39 @@ InitProcGlobal(void)
 	 * Initialize the data structures.
 	 */
 	ProcGlobal->freeProcs = INVALID_OFFSET;
+	ProcGlobal->autovacFreeProcs = INVALID_OFFSET;
 
 	ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
 
 	/*
 	 * Pre-create the PGPROC structures and create a semaphore for each.
 	 */
-	procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC));
+	procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC));
 	if (!procs)
 		ereport(FATAL,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of shared memory")));
-	MemSet(procs, 0, MaxBackends * sizeof(PGPROC));
-	for (i = 0; i < MaxBackends; i++)
+	MemSet(procs, 0, MaxConnections * sizeof(PGPROC));
+	for (i = 0; i < MaxConnections; i++)
 	{
 		PGSemaphoreCreate(&(procs[i].sem));
 		procs[i].links.next = ProcGlobal->freeProcs;
 		ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]);
 	}
 
+	procs = (PGPROC *) ShmemAlloc((autovacuum_max_workers) * sizeof(PGPROC));
+	if (!procs)
+		ereport(FATAL,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of shared memory")));
+	MemSet(procs, 0, autovacuum_max_workers * sizeof(PGPROC));
+	for (i = 0; i < autovacuum_max_workers; i++)
+	{
+		PGSemaphoreCreate(&(procs[i].sem));
+		procs[i].links.next = ProcGlobal->autovacFreeProcs;
+		ProcGlobal->autovacFreeProcs = MAKE_OFFSET(&procs[i]);
+	}
+
 	MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC));
 	for (i = 0; i < NUM_AUXILIARY_PROCS; i++)
 	{
@@ -226,12 +243,18 @@ InitProcess(void)
 
 	set_spins_per_delay(procglobal->spins_per_delay);
 
-	myOffset = procglobal->freeProcs;
+	if (IsAutoVacuumWorkerProcess())
+		myOffset = procglobal->autovacFreeProcs;
+	else
+		myOffset = procglobal->freeProcs;
 
 	if (myOffset != INVALID_OFFSET)
 	{
 		MyProc = (PGPROC *) MAKE_PTR(myOffset);
-		procglobal->freeProcs = MyProc->links.next;
+		if (IsAutoVacuumWorkerProcess())
+			procglobal->autovacFreeProcs = MyProc->links.next;
+		else
+			procglobal->freeProcs = MyProc->links.next;
 		SpinLockRelease(ProcStructLock);
 	}
 	else
@@ -239,7 +262,8 @@ InitProcess(void)
 		/*
 		 * If we reach here, all the PGPROCs are in use.  This is one of the
 		 * possible places to detect "too many backends", so give the standard
-		 * error message.
+		 * error message.  XXX do we need to give a different failure message
+		 * in the autovacuum case?
 		 */
 		SpinLockRelease(ProcStructLock);
 		ereport(FATAL,
@@ -571,8 +595,16 @@ ProcKill(int code, Datum arg)
 	SpinLockAcquire(ProcStructLock);
 
 	/* Return PGPROC structure (and semaphore) to freelist */
-	MyProc->links.next = procglobal->freeProcs;
-	procglobal->freeProcs = MAKE_OFFSET(MyProc);
+	if (IsAutoVacuumWorkerProcess())
+	{
+		MyProc->links.next = procglobal->autovacFreeProcs;
+		procglobal->autovacFreeProcs = MAKE_OFFSET(MyProc);
+	}
+	else
+	{
+		MyProc->links.next = procglobal->freeProcs;
+		procglobal->freeProcs = MAKE_OFFSET(MyProc);
+	}
 
 	/* PGPROC struct isn't mine anymore */
 	MyProc = NULL;
@@ -581,6 +613,10 @@ ProcKill(int code, Datum arg)
 	procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay);
 
 	SpinLockRelease(ProcStructLock);
+
+	/* wake autovac launcher if needed -- see comments in FreeWorkerInfo */
+	if (AutovacuumLauncherPid != 0)
+		kill(AutovacuumLauncherPid, SIGUSR1);
 }
 
 /*
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 130244b5e5..b5b1150056 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.100 2007/01/05 22:19:44 momjian Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.101 2007/04/16 18:29:54 alvherre Exp $
  *
  * NOTES
  *	  Globals used all over the place should be declared here and not
@@ -95,9 +95,14 @@ bool		allowSystemTableMods = false;
 int			work_mem = 1024;
 int			maintenance_work_mem = 16384;
 
-/* Primary determinants of sizes of shared-memory structures: */
+/*
+ * Primary determinants of sizes of shared-memory structures.  MaxBackends is
+ * MaxConnections + autovacuum_max_workers (it is computed by the GUC assign
+ * hook):
+ */
 int			NBuffers = 1000;
 int			MaxBackends = 100;
+int			MaxConnections = 90;
 
 int			VacuumCostPageHit = 1;		/* GUC parameters for vacuum */
 int			VacuumCostPageMiss = 10;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 385411c058..83ea00c568 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -10,7 +10,7 @@
  * Written by Peter Eisentraut <peter_e@gmx.net>.
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.384 2007/04/12 06:53:47 neilc Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.385 2007/04/16 18:29:55 alvherre Exp $
  *
  *--------------------------------------------------------------------
  */
@@ -163,6 +163,8 @@ static bool assign_tcp_keepalives_count(int newval, bool doit, GucSource source)
 static const char *show_tcp_keepalives_idle(void);
 static const char *show_tcp_keepalives_interval(void);
 static const char *show_tcp_keepalives_count(void);
+static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source);
+static bool assign_maxconnections(int newval, bool doit, GucSource source);
 
 /*
  * GUC option variables that are exported from this module
@@ -1149,16 +1151,19 @@ static struct config_int ConfigureNamesInt[] =
 	 * number.
 	 *
 	 * MaxBackends is limited to INT_MAX/4 because some places compute
-	 * 4*MaxBackends without any overflow check.  Likewise we have to limit
-	 * NBuffers to INT_MAX/2.
+	 * 4*MaxBackends without any overflow check.  This check is made on
+	 * assign_maxconnections, since MaxBackends is computed as MaxConnections +
+	 * autovacuum_max_workers.
+	 *
+	 * Likewise we have to limit NBuffers to INT_MAX/2.
 	 */
 	{
 		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the maximum number of concurrent connections."),
 			NULL
 		},
-		&MaxBackends,
-		100, 1, INT_MAX / 4, NULL, NULL
+		&MaxConnections,
+		100, 1, INT_MAX / 4, assign_maxconnections, NULL
 	},
 
 	{
@@ -1622,6 +1627,15 @@ static struct config_int ConfigureNamesInt[] =
 		&autovacuum_freeze_max_age,
 		200000000, 100000000, 2000000000, NULL, NULL
 	},
+	{
+		/* see max_connections */
+		{"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
+			gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
+			NULL
+		},
+		&autovacuum_max_workers,
+		3, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL
+	},
 
 	{
 		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
@@ -6692,5 +6706,32 @@ show_tcp_keepalives_count(void)
 	return nbuf;
 }
 
+static bool
+assign_maxconnections(int newval, bool doit, GucSource source)
+{
+	if (doit)
+	{
+		if (newval + autovacuum_max_workers > INT_MAX / 4)
+			return false;
+
+		MaxBackends = newval + autovacuum_max_workers;
+	}
+
+	return true;
+}
+
+static bool
+assign_autovacuum_max_workers(int newval, bool doit, GucSource source)
+{
+	if (doit)
+	{
+		if (newval + MaxConnections > INT_MAX / 4)
+			return false;
+
+		MaxBackends = newval + MaxConnections;
+	}
+
+	return true;
+}
 
 #include "guc-file.c"
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 22f9685bbd..bc5b642d02 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -376,6 +376,7 @@
 #autovacuum = on			# enable autovacuum subprocess?
 					# 'on' requires stats_start_collector
 					# and stats_row_level to also be on
+#autovacuum_max_workers = 3 # max # of autovacuum subprocesses
 #autovacuum_naptime = 1min		# time between autovacuum runs
 #autovacuum_vacuum_threshold = 500	# min # of tuple updates before
 					# vacuum
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index ca5cc799c5..c5090581c5 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.193 2007/03/01 14:52:04 petere Exp $
+ * $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.194 2007/04/16 18:29:56 alvherre Exp $
  *
  * NOTES
  *	  some of the information in this file should be moved to other files.
@@ -129,6 +129,7 @@ extern DLLIMPORT char *DataDir;
 
 extern DLLIMPORT int NBuffers;
 extern int	MaxBackends;
+extern int	MaxConnections;
 
 extern DLLIMPORT int MyProcPid;
 extern DLLIMPORT struct Port *MyProcPort;
diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h
index facf9de52b..ccd982b681 100644
--- a/src/include/postmaster/autovacuum.h
+++ b/src/include/postmaster/autovacuum.h
@@ -7,15 +7,18 @@
  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/postmaster/autovacuum.h,v 1.8 2007/02/15 23:23:23 alvherre Exp $
+ * $PostgreSQL: pgsql/src/include/postmaster/autovacuum.h,v 1.9 2007/04/16 18:30:03 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef AUTOVACUUM_H
 #define AUTOVACUUM_H
 
+#include "storage/lock.h"
+
 /* GUC variables */
 extern bool autovacuum_start_daemon;
+extern int	autovacuum_max_workers;
 extern int	autovacuum_naptime;
 extern int	autovacuum_vac_thresh;
 extern double autovacuum_vac_scale;
@@ -25,6 +28,9 @@ extern int	autovacuum_freeze_max_age;
 extern int	autovacuum_vac_cost_delay;
 extern int	autovacuum_vac_cost_limit;
 
+/* autovacuum launcher PID, only valid when worker is shutting down */
+extern int	AutovacuumLauncherPid;
+
 /* Status inquiry functions */
 extern bool AutoVacuumingActive(void);
 extern bool IsAutoVacuumLauncherProcess(void);
@@ -35,6 +41,9 @@ extern void autovac_init(void);
 extern int	StartAutoVacLauncher(void);
 extern int	StartAutoVacWorker(void);
 
+/* autovacuum cost-delay balancer */
+extern void AutoVacuumUpdateDelay(void);
+
 #ifdef EXEC_BACKEND
 extern void AutoVacLauncherMain(int argc, char *argv[]);
 extern void AutoVacWorkerMain(int argc, char *argv[]);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index c47256a159..477284b7d1 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.35 2007/04/03 16:34:36 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.36 2007/04/16 18:30:04 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -61,6 +61,7 @@ typedef enum LWLockId
 	BtreeVacuumLock,
 	AddinShmemInitLock,
 	AutovacuumLock,
+	AutovacuumScheduleLock,
 	/* Individual lock IDs end here */
 	FirstBufMappingLock,
 	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 772cf52cdf..1fd4e264f0 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.97 2007/04/03 16:34:36 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.98 2007/04/16 18:30:04 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -115,6 +115,8 @@ typedef struct PROC_HDR
 {
 	/* Head of list of free PGPROC structures */
 	SHMEM_OFFSET freeProcs;
+	/* Head of list of autovacuum's free PGPROC structures */
+	SHMEM_OFFSET autovacFreeProcs;
 	/* Current shared estimate of appropriate spins_per_delay value */
 	int			spins_per_delay;
 } PROC_HDR;