From: Alvaro Herrera <alvherre@alvh.no-ip.org> Date: Mon, 16 Apr 2007 18:30:04 +0000 (+0000) Subject: Add a multi-worker capability to autovacuum. This allows multiple worker X-Git-Tag: REL8_3_BETA1~827 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=e2a186b03cc1a87cf26644db18f28a20f10bd739;p=postgresql Add a multi-worker capability to autovacuum. This allows multiple worker processes to be running simultaneously. Also, now autovacuum processes do not count towards the max_connections limit; they are counted separately from regular processes, and are limited by the new GUC variable autovacuum_max_workers. The launcher now has intelligence to launch workers on each database every autovacuum_naptime seconds, limited only on the max amount of worker slots available. Also, the global worker I/O utilization is limited by the vacuum cost-based delay feature. Workers are "balanced" so that the total I/O consumption does not exceed the established limit. This part of the patch was contributed by ITAGAKI Takahiro. Per discussion. --- diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 57a618faa6..e10d2d753a 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1,4 +1,4 @@ -<!-- $PostgreSQL: pgsql/doc/src/sgml/config.sgml,v 1.119 2007/04/02 15:27:02 petere Exp $ --> +<!-- $PostgreSQL: pgsql/doc/src/sgml/config.sgml,v 1.120 2007/04/16 18:29:50 alvherre Exp $ --> <chapter Id="runtime-config"> <title>Server Configuration</title> @@ -3166,7 +3166,7 @@ SELECT * FROM parent WHERE key = 2400; <listitem> <para> Controls whether the server should run the - autovacuum daemon. This is off by default. + autovacuum launcher daemon. This is on by default. <varname>stats_start_collector</> and <varname>stats_row_level</> must also be turned on for autovacuum to work. This parameter can only be set in the <filename>postgresql.conf</> @@ -3175,6 +3175,21 @@ SELECT * FROM parent WHERE key = 2400; </listitem> </varlistentry> + <varlistentry id="guc-autovacuum-max-workers" xreflabel="autovacuum_max_workers"> + <term><varname>autovacuum_max_workers</varname> (<type>integer</type>)</term> + <indexterm> + <primary><varname>autovacuum_max_workers</> configuration parameter</primary> + </indexterm> + <listitem> + <para> + Specifies the maximum number of autovacuum processes (other than the + autovacuum launcher) which may be running at any one time. The default + is three (<literal>3</literal>). This parameter can only be set in + the <filename>postgresql.conf</> file or on the server command line. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-autovacuum-naptime" xreflabel="autovacuum_naptime"> <term><varname>autovacuum_naptime</varname> (<type>integer</type>)</term> <indexterm> @@ -3182,9 +3197,9 @@ SELECT * FROM parent WHERE key = 2400; </indexterm> <listitem> <para> - Specifies the delay between activity rounds for the autovacuum - daemon. In each round the daemon examines one database - and issues <command>VACUUM</> and <command>ANALYZE</> commands + Specifies the minimum delay between autovacuum runs on any given + database. In each round the daemon examines the + database and issues <command>VACUUM</> and <command>ANALYZE</> commands as needed for tables in that database. The delay is measured in seconds, and the default is one minute (<literal>1m</>). This parameter can only be set in the <filename>postgresql.conf</> @@ -3318,7 +3333,10 @@ SELECT * FROM parent WHERE key = 2400; Specifies the cost limit value that will be used in automatic <command>VACUUM</> operations. If <literal>-1</> is specified (which is the default), the regular - <xref linkend="guc-vacuum-cost-limit"> value will be used. + <xref linkend="guc-vacuum-cost-limit"> value will be used. Note that + the value is distributed proportionally among the running autovacuum + workers, if there is more than one, so that the sum of the limits of + each worker never exceeds the limit on this variable. This parameter can only be set in the <filename>postgresql.conf</> file or on the server command line. This setting can be overridden for individual tables by entries in diff --git a/doc/src/sgml/maintenance.sgml b/doc/src/sgml/maintenance.sgml index fe5369c19c..2be11332c2 100644 --- a/doc/src/sgml/maintenance.sgml +++ b/doc/src/sgml/maintenance.sgml @@ -1,4 +1,4 @@ -<!-- $PostgreSQL: pgsql/doc/src/sgml/maintenance.sgml,v 1.70 2007/02/01 19:10:24 momjian Exp $ --> +<!-- $PostgreSQL: pgsql/doc/src/sgml/maintenance.sgml,v 1.71 2007/04/16 18:29:50 alvherre Exp $ --> <chapter id="maintenance"> <title>Routine Database Maintenance Tasks</title> @@ -466,26 +466,43 @@ HINT: Stop the postmaster and use a standalone backend to VACUUM in "mydb". <secondary>general information</secondary> </indexterm> <para> - Beginning in <productname>PostgreSQL </productname> 8.1, there is a - separate optional server process called the <firstterm>autovacuum - daemon</firstterm>, whose purpose is to automate the execution of + Beginning in <productname>PostgreSQL</productname> 8.1, there is an + optional feature called <firstterm>autovacuum</firstterm>, + whose purpose is to automate the execution of <command>VACUUM</command> and <command>ANALYZE </command> commands. - When enabled, the autovacuum daemon runs periodically and checks for + When enabled, autovacuum checks for tables that have had a large number of inserted, updated or deleted tuples. These checks use the row-level statistics collection facility; - therefore, the autovacuum daemon cannot be used unless <xref + therefore, autovacuum cannot be used unless <xref linkend="guc-stats-start-collector"> and <xref - linkend="guc-stats-row-level"> are set to <literal>true</literal>. Also, - it's important to allow a slot for the autovacuum process when choosing - the value of <xref linkend="guc-superuser-reserved-connections">. In - the default configuration, autovacuuming is enabled and the related + linkend="guc-stats-row-level"> are set to <literal>true</literal>. + In the default configuration, autovacuuming is enabled and the related configuration parameters are appropriately set. </para> <para> - The autovacuum daemon, when enabled, runs every <xref - linkend="guc-autovacuum-naptime"> seconds. On each run, it selects - one database to process and checks each table within that database. + Beginning in <productname>PostgreSQL</productname> 8.3, autovacuum has a + multi-process architecture: there is a daemon process, called the + <firstterm>autovacuum launcher</firstterm>, which is in charge of starting + an <firstterm>autovacuum worker</firstterm> process on each database every + <xref linkend="guc-autovacuum-naptime"> seconds. + </para> + + <para> + There is a limit of <xref linkend="guc-autovacuum-max-workers"> worker + processes that may be running at at any time, so if the <command>VACUUM</> + and <command>ANALYZE</> work to do takes too long to run, the deadline may + be failed to meet for other databases. Also, if a particular database + takes long to process, more than one worker may be processing it + simultaneously. The workers are smart enough to avoid repeating work that + other workers have done, so this is normally not a problem. Note that the + number of running workers does not count towards the <xref + linkend="guc-max-connections"> nor the <xref + linkend="guc-superuser-reserved-connections"> limits. + </para> + + <para> + On each run, the worker process checks each table within that database, and <command>VACUUM</command> or <command>ANALYZE</command> commands are issued as needed. </para> @@ -581,6 +598,12 @@ analyze threshold = analyze base threshold + analyze scale factor * number of tu </para> </caution> + <para> + When multiple workers are running, the cost limit is "balanced" among all + the running workers, so that the total impact on the system is the same, + regardless of the number of workers actually running. + </para> + </sect2> </sect1> diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index d350420ab2..f275448756 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -13,7 +13,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.349 2007/03/14 18:48:55 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.350 2007/04/16 18:29:50 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -3504,6 +3504,9 @@ vacuum_delay_point(void) VacuumCostBalance = 0; + /* update balance values for workers */ + AutoVacuumUpdateDelay(); + /* Might have gotten an interrupt while sleeping */ CHECK_FOR_INTERRUPTS(); } diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 4631c636c0..9893fa680b 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.40 2007/03/28 22:17:12 alvherre Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.41 2007/04/16 18:29:52 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -52,6 +52,7 @@ #include "utils/syscache.h" +static volatile sig_atomic_t got_SIGUSR1 = false; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t avlauncher_shutdown_request = false; @@ -59,6 +60,7 @@ static volatile sig_atomic_t avlauncher_shutdown_request = false; * GUC parameters */ bool autovacuum_start_daemon = false; +int autovacuum_max_workers; int autovacuum_naptime; int autovacuum_vac_thresh; double autovacuum_vac_scale; @@ -69,7 +71,7 @@ int autovacuum_freeze_max_age; int autovacuum_vac_cost_delay; int autovacuum_vac_cost_limit; -/* Flag to tell if we are in the autovacuum daemon process */ +/* Flags to tell if we are in an autovacuum process */ static bool am_autovacuum_launcher = false; static bool am_autovacuum_worker = false; @@ -82,14 +84,22 @@ static int default_freeze_min_age; /* Memory context for long-lived data */ static MemoryContext AutovacMemCxt; -/* struct to keep list of candidate databases for vacuum */ -typedef struct autovac_dbase +/* struct to keep track of databases in launcher */ +typedef struct avl_dbase { - Oid ad_datid; - char *ad_name; - TransactionId ad_frozenxid; - PgStat_StatDBEntry *ad_entry; -} autovac_dbase; + Oid adl_datid; /* hash key -- must be first */ + TimestampTz adl_next_worker; + int adl_score; +} avl_dbase; + +/* struct to keep track of databases in worker */ +typedef struct avw_dbase +{ + Oid adw_datid; + char *adw_name; + TransactionId adw_frozenxid; + PgStat_StatDBEntry *adw_entry; +} avw_dbase; /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */ typedef struct av_relation @@ -110,14 +120,73 @@ typedef struct autovac_table int at_vacuum_cost_limit; } autovac_table; +/*------------- + * This struct holds information about a single worker's whereabouts. We keep + * an array of these in shared memory, sized according to + * autovacuum_max_workers. + * + * wi_links entry into free list or running list + * wi_dboid OID of the database this worker is supposed to work on + * wi_tableoid OID of the table currently being vacuumed + * wi_workerpid PID of the running worker, 0 if not yet started + * wi_launchtime Time at which this worker was launched + * wi_cost_* Vacuum cost-based delay parameters current in this worker + * + * All fields are protected by AutovacuumLock, except for wi_tableoid which is + * protected by AutovacuumScheduleLock (which is read-only for everyone except + * that worker itself). + *------------- + */ +typedef struct WorkerInfoData +{ + SHM_QUEUE wi_links; + Oid wi_dboid; + Oid wi_tableoid; + int wi_workerpid; + TimestampTz wi_launchtime; + int wi_cost_delay; + int wi_cost_limit; + int wi_cost_limit_base; +} WorkerInfoData; + +typedef struct WorkerInfoData *WorkerInfo; + +/*------------- + * The main autovacuum shmem struct. On shared memory we store this main + * struct and the array of WorkerInfo structs. This struct keeps: + * + * av_launcherpid the PID of the autovacuum launcher + * av_freeWorkers the WorkerInfo freelist + * av_runningWorkers the WorkerInfo non-free queue + * av_startingWorker pointer to WorkerInfo currently being started (cleared by + * the worker itself as soon as it's up and running) + * av_rebalance true when a worker determines that cost limits must be + * rebalanced + * + * This struct is protected by AutovacuumLock. + *------------- + */ typedef struct { - Oid process_db; /* OID of database to process */ - int worker_pid; /* PID of the worker process, if any */ + pid_t av_launcherpid; + SHMEM_OFFSET av_freeWorkers; + SHM_QUEUE av_runningWorkers; + SHMEM_OFFSET av_startingWorker; + bool av_rebalance; } AutoVacuumShmemStruct; static AutoVacuumShmemStruct *AutoVacuumShmem; +/* the database list in the launcher, and the context that contains it */ +static Dllist *DatabaseList = NULL; +static MemoryContext DatabaseListCxt = NULL; + +/* Pointer to my own WorkerInfo, valid on each worker */ +static WorkerInfo MyWorkerInfo = NULL; + +/* PID of launcher, valid only in worker while shutting down */ +int AutovacuumLauncherPid = 0; + #ifdef EXEC_BACKEND static pid_t avlauncher_forkexec(void); static pid_t avworker_forkexec(void); @@ -125,9 +194,16 @@ static pid_t avworker_forkexec(void); NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]); NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]); -static void do_start_worker(void); +static Oid do_start_worker(void); +static uint64 launcher_determine_sleep(bool canlaunch, bool recursing); +static void launch_worker(TimestampTz now); +static List *get_database_list(void); +static void rebuild_database_list(Oid newdb); +static int db_comparator(const void *a, const void *b); +static void autovac_balance_cost(void); + static void do_autovacuum(void); -static List *autovac_get_database_list(void); +static void FreeWorkerInfo(int code, Datum arg); static void relation_check_autovac(Oid relid, Form_pg_class classForm, Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry, @@ -147,6 +223,7 @@ static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared, PgStat_StatDBEntry *dbentry); static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid); static void avl_sighup_handler(SIGNAL_ARGS); +static void avl_sigusr1_handler(SIGNAL_ARGS); static void avlauncher_shutdown(SIGNAL_ARGS); static void avl_quickdie(SIGNAL_ARGS); @@ -230,12 +307,34 @@ StartAutoVacLauncher(void) /* * Main loop for the autovacuum launcher process. + * + * The signalling between launcher and worker is as follows: + * + * When the worker has finished starting up, it stores its PID in wi_workerpid + * and sends a SIGUSR1 signal to the launcher. The launcher then knows that + * the postmaster is ready to start a new worker. We do it this way because + * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't + * yet processed the last one, in which case the second signal would be lost. + * This is only useful when two workers need to be started close to one + * another, which should be rare but it's possible. + * + * When a worker exits, it resets the WorkerInfo struct and puts it back into + * the free list. If there is no free worker slot, it will also signal the + * launcher, which then wakes up and can launch a new worker if it needs to. + * Note that we only need to do it when there's no free worker slot, because + * otherwise there is no need -- the launcher would be awakened normally per + * schedule. + * + * There is a potential problem if, for some reason, a worker starts and is not + * able to bootstrap itself correctly. To prevent this situation from starving + * the whole system, the launcher checks the launch time of the "starting + * worker". If it's too old (older than autovacuum_naptime seconds), it resets + * the worker entry and puts it back into the free list. */ NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; - MemoryContext avlauncher_cxt; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; @@ -264,9 +363,6 @@ AutoVacLauncherMain(int argc, char *argv[]) * Set up signal handlers. Since this is an auxiliary process, it has * particular signal requirements -- no deadlock checker or sinval * catchup, for example. - * - * XXX It may be a good idea to receive signals when an avworker process - * finishes. */ pqsignal(SIGHUP, avl_sighup_handler); @@ -276,7 +372,7 @@ AutoVacLauncherMain(int argc, char *argv[]) pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR1, avl_sigusr1_handler); /* We don't listen for async notifies */ pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGFPE, FloatExceptionHandler); @@ -300,12 +396,12 @@ AutoVacLauncherMain(int argc, char *argv[]) * that we can reset the context during error recovery and thereby avoid * possible memory leaks. */ - avlauncher_cxt = AllocSetContextCreate(TopMemoryContext, - "Autovacuum Launcher", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(avlauncher_cxt); + AutovacMemCxt = AllocSetContextCreate(TopMemoryContext, + "Autovacuum Launcher", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(AutovacMemCxt); /* @@ -336,11 +432,15 @@ AutoVacLauncherMain(int argc, char *argv[]) * Now return to normal top-level context and clear ErrorContext for * next time. */ - MemoryContextSwitchTo(avlauncher_cxt); + MemoryContextSwitchTo(AutovacMemCxt); FlushErrorState(); /* Flush any leaked data in the top-level context */ - MemoryContextResetAndDeleteChildren(avlauncher_cxt); + MemoryContextResetAndDeleteChildren(AutovacMemCxt); + + /* don't leave dangling pointers to freed memory */ + DatabaseListCxt = NULL; + DatabaseList = NULL; /* Make sure pgstat also considers our stat data as gone */ pgstat_clear_snapshot(); @@ -361,18 +461,32 @@ AutoVacLauncherMain(int argc, char *argv[]) ereport(LOG, (errmsg("autovacuum launcher started"))); + /* must unblock signals before calling rebuild_database_list */ PG_SETMASK(&UnBlockSig); + /* in emergency mode, just start a worker and go away */ + if (!autovacuum_start_daemon) + { + do_start_worker(); + proc_exit(0); /* done */ + } + + AutoVacuumShmem->av_launcherpid = MyProcPid; + /* - * take a nap before executing the first iteration, unless we were - * requested an emergency run. + * Create the initial database list. The invariant we want this list to + * keep is that it's ordered by decreasing next_time. As soon as an entry + * is updated to a higher time, it will be moved to the front (which is + * correct because the only operation is to add autovacuum_naptime to the + * entry, and time always increases). */ - if (autovacuum_start_daemon) - pg_usleep(autovacuum_naptime * 1000000L); + rebuild_database_list(InvalidOid); for (;;) { - int worker_pid; + uint64 micros; + bool can_launch; + TimestampTz current_time = 0; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -381,6 +495,13 @@ AutoVacLauncherMain(int argc, char *argv[]) if (!PostmasterIsAlive(true)) exit(1); + micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers != + INVALID_OFFSET, false); + + /* Sleep for a while according to schedule */ + pg_usleep(micros); + + /* the normal shutdown case */ if (avlauncher_shutdown_request) break; @@ -388,82 +509,455 @@ AutoVacLauncherMain(int argc, char *argv[]) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + + /* rebalance in case the default cost parameters changed */ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + autovac_balance_cost(); + LWLockRelease(AutovacuumLock); + + /* rebuild the list in case the naptime changed */ + rebuild_database_list(InvalidOid); + } + + /* a worker started up or finished */ + if (got_SIGUSR1) + { + got_SIGUSR1 = false; + + /* rebalance cost limits, if needed */ + if (AutoVacuumShmem->av_rebalance) + { + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + AutoVacuumShmem->av_rebalance = false; + autovac_balance_cost(); + LWLockRelease(AutovacuumLock); + } } /* - * if there's a worker already running, sleep until it - * disappears. + * There are some conditions that we need to check before trying to + * start a launcher. First, we need to make sure that there is a + * launcher slot available. Second, we need to make sure that no other + * worker is still starting up. */ + LWLockAcquire(AutovacuumLock, LW_SHARED); - worker_pid = AutoVacuumShmem->worker_pid; - LWLockRelease(AutovacuumLock); - if (worker_pid != 0) - { - PGPROC *proc = BackendPidGetProc(worker_pid); + can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET); - if (proc != NULL && proc->isAutovacuum) - goto sleep; + if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET) + { + long secs; + int usecs; + WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); + + if (current_time == 0) + current_time = GetCurrentTimestamp(); + + /* + * We can't launch another worker when another one is still + * starting up, so just sleep for a bit more; that worker will wake + * us up again as soon as it's ready. We will only wait + * autovacuum_naptime seconds for this to happen however. Note + * that failure to connect to a particular database is not a + * problem here, because the worker removes itself from the + * startingWorker pointer before trying to connect; only low-level + * problems, like fork() failure, can get us here. + */ + TimestampDifference(worker->wi_launchtime, current_time, + &secs, &usecs); + + /* ignore microseconds, as they cannot make any difference */ + if (secs > autovacuum_naptime) + { + LWLockRelease(AutovacuumLock); + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + /* + * No other process can put a worker in starting mode, so if + * startingWorker is still INVALID after exchanging our lock, + * we assume it's the same one we saw above (so we don't + * recheck the launch time). + */ + if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET) + { + worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); + worker->wi_dboid = InvalidOid; + worker->wi_tableoid = InvalidOid; + worker->wi_workerpid = 0; + worker->wi_launchtime = 0; + worker->wi_links.next = AutoVacuumShmem->av_freeWorkers; + AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker); + AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; + } + } else { /* - * if the worker is not really running (or it's a process - * that's not an autovacuum worker), remove the PID from shmem. - * This should not happen, because either the worker exits - * cleanly, in which case it'll remove the PID, or it dies, in - * which case postmaster will cause a system reset cycle. + * maybe the postmaster neglected this start signal -- + * resend it. Note: the constraints in + * launcher_determine_sleep keep us from delivering signals too + * quickly (at most once every 100ms). */ - LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); - worker_pid = 0; - LWLockRelease(AutovacuumLock); + SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); + can_launch = false; } } + LWLockRelease(AutovacuumLock); /* either shared or exclusive */ - do_start_worker(); + if (can_launch) + { + Dlelem *elem; -sleep: - /* - * in emergency mode, exit immediately so that the postmaster can - * request another run right away if needed. - * - * XXX -- maybe it would be better to handle this inside the launcher - * itself. - */ - if (!autovacuum_start_daemon) - break; + elem = DLGetTail(DatabaseList); - /* have pgstat read the file again next time */ - pgstat_clear_snapshot(); + if (current_time == 0) + current_time = GetCurrentTimestamp(); + + if (elem != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + long secs; + int usecs; - /* now sleep until the next autovac iteration */ - pg_usleep(autovacuum_naptime * 1000000L); + TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs); + + /* do we have to start a worker? */ + if (secs <= 0 && usecs <= 0) + launch_worker(current_time); + } + else + { + /* + * Special case when the list is empty: start a worker right + * away. This covers the initial case, when no database is in + * pgstats (thus the list is empty). Note that the constraints + * in launcher_determine_sleep keep us from starting workers + * too quickly (at most once every autovacuum_naptime when the + * list is empty). + */ + launch_worker(current_time); + } + } } /* Normal exit from the autovac launcher is here */ ereport(LOG, (errmsg("autovacuum launcher shutting down"))); + AutoVacuumShmem->av_launcherpid = 0; proc_exit(0); /* done */ } +/* + * Determine the time to sleep, in microseconds, based on the database list. + * + * The "canlaunch" parameter indicates whether we can start a worker right now, + * for example due to the workers being all busy. + */ +static uint64 +launcher_determine_sleep(bool canlaunch, bool recursing) +{ + long secs; + int usecs; + Dlelem *elem; + + /* + * We sleep until the next scheduled vacuum. We trust that when the + * database list was built, care was taken so that no entries have times in + * the past; if the first entry has too close a next_worker value, or a + * time in the past, we will sleep a small nominal time. + */ + if (!canlaunch) + { + secs = autovacuum_naptime; + usecs = 0; + } + else if ((elem = DLGetTail(DatabaseList)) != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + TimestampTz current_time = GetCurrentTimestamp(); + TimestampTz next_wakeup; + + next_wakeup = avdb->adl_next_worker; + TimestampDifference(current_time, next_wakeup, &secs, &usecs); + } + else + { + /* list is empty, sleep for whole autovacuum_naptime seconds */ + secs = autovacuum_naptime; + usecs = 0; + } + + /* + * If the result is exactly zero, it means a database had an entry with + * time in the past. Rebuild the list so that the databases are evenly + * distributed again, and recalculate the time to sleep. This can happen + * if there are more tables needing vacuum than workers, and they all take + * longer to vacuum than autovacuum_naptime. + * + * We only recurse once. rebuild_database_list should always return times + * in the future, but it seems best not to trust too much on that. + */ + if (secs == 0L && usecs == 0 && !recursing) + { + rebuild_database_list(InvalidOid); + return launcher_determine_sleep(canlaunch, true); + } + + /* 100ms is the smallest time we'll allow the launcher to sleep */ + if (secs <= 0L && usecs <= 100000) + { + secs = 0L; + usecs = 100000; /* 100 ms */ + } + + return secs * 1000000 + usecs; +} + +/* + * Build an updated DatabaseList. It must only contain databases that appear + * in pgstats, and must be sorted by next_worker from highest to lowest, + * distributed regularly across the next autovacuum_naptime interval. + * + * Receives the Oid of the database that made this list be generated (we call + * this the "new" database, because when the database was already present on + * the list, we expect that this function is not called at all). The + * preexisting list, if any, will be used to preserve the order of the + * databases in the autovacuum_naptime period. The new database is put at the + * end of the interval. The actual values are not saved, which should not be + * much of a problem. + */ +static void +rebuild_database_list(Oid newdb) +{ + List *dblist; + ListCell *cell; + MemoryContext newcxt; + MemoryContext oldcxt; + MemoryContext tmpcxt; + HASHCTL hctl; + int score; + int nelems; + HTAB *dbhash; + + /* use fresh stats */ + pgstat_clear_snapshot(); + + newcxt = AllocSetContextCreate(AutovacMemCxt, + "AV dblist", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + tmpcxt = AllocSetContextCreate(newcxt, + "tmp AV dblist", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcxt = MemoryContextSwitchTo(tmpcxt); + + /* + * Implementing this is not as simple as it sounds, because we need to put + * the new database at the end of the list; next the databases that were + * already on the list, and finally (at the tail of the list) all the other + * databases that are not on the existing list. + * + * To do this, we build an empty hash table of scored databases. We will + * start with the lowest score (zero) for the new database, then increasing + * scores for the databases in the existing list, in order, and lastly + * increasing scores for all databases gotten via get_database_list() that + * are not already on the hash. + * + * Then we will put all the hash elements into an array, sort the array by + * score, and finally put the array elements into the new doubly linked + * list. + */ + hctl.keysize = sizeof(Oid); + hctl.entrysize = sizeof(avl_dbase); + hctl.hash = oid_hash; + hctl.hcxt = tmpcxt; + dbhash = hash_create("db hash", 20, &hctl, /* magic number here FIXME */ + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + /* start by inserting the new database */ + score = 0; + if (OidIsValid(newdb)) + { + avl_dbase *db; + PgStat_StatDBEntry *entry; + + /* only consider this database if it has a pgstat entry */ + entry = pgstat_fetch_stat_dbentry(newdb); + if (entry != NULL) + { + /* we assume it isn't found because the hash was just created */ + db = hash_search(dbhash, &newdb, HASH_ENTER, NULL); + + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + + /* Now insert the databases from the existing list */ + if (DatabaseList != NULL) + { + Dlelem *elem; + + elem = DLGetHead(DatabaseList); + while (elem != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + avl_dbase *db; + bool found; + PgStat_StatDBEntry *entry; + + elem = DLGetSucc(elem); + + /* + * skip databases with no stat entries -- in particular, this + * gets rid of dropped databases + */ + entry = pgstat_fetch_stat_dbentry(avdb->adl_datid); + if (entry == NULL) + continue; + + db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found); + + if (!found) + { + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + } + + /* finally, insert all qualifying databases not previously inserted */ + dblist = get_database_list(); + foreach(cell, dblist) + { + avw_dbase *avdb = lfirst(cell); + avl_dbase *db; + bool found; + PgStat_StatDBEntry *entry; + + /* only consider databases with a pgstat entry */ + entry = pgstat_fetch_stat_dbentry(avdb->adw_datid); + if (entry == NULL) + continue; + + db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found); + /* only update the score if the database was not already on the hash */ + if (!found) + { + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } + } + nelems = score; + + /* from here on, the allocated memory belongs to the new list */ + MemoryContextSwitchTo(newcxt); + DatabaseList = DLNewList(); + + if (nelems > 0) + { + TimestampTz current_time; + int millis_increment; + avl_dbase *dbary; + avl_dbase *db; + HASH_SEQ_STATUS seq; + int i; + + /* put all the hash elements into an array */ + dbary = palloc(nelems * sizeof(avl_dbase)); + + i = 0; + hash_seq_init(&seq, dbhash); + while ((db = hash_seq_search(&seq)) != NULL) + memcpy(&(dbary[i++]), db, sizeof(avl_dbase)); + + /* sort the array */ + qsort(dbary, nelems, sizeof(avl_dbase), db_comparator); + + /* this is the time interval between databases in the schedule */ + millis_increment = 1000.0 * autovacuum_naptime / nelems; + current_time = GetCurrentTimestamp(); + + /* + * move the elements from the array into the dllist, setting the + * next_worker while walking the array + */ + for (i = 0; i < nelems; i++) + { + avl_dbase *db = &(dbary[i]); + Dlelem *elem; + + current_time = TimestampTzPlusMilliseconds(current_time, + millis_increment); + db->adl_next_worker = current_time; + + elem = DLNewElem(db); + /* later elements should go closer to the head of the list */ + DLAddHead(DatabaseList, elem); + } + } + + /* all done, clean up memory */ + if (DatabaseListCxt != NULL) + MemoryContextDelete(DatabaseListCxt); + MemoryContextDelete(tmpcxt); + DatabaseListCxt = newcxt; + MemoryContextSwitchTo(oldcxt); +} + +/* qsort comparator for avl_dbase, using adl_score */ +static int +db_comparator(const void *a, const void *b) +{ + if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score) + return 0; + else + return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1; +} + /* * do_start_worker * * Bare-bones procedure for starting an autovacuum worker from the launcher. * It determines what database to work on, sets up shared memory stuff and - * signals postmaster to start the worker. + * signals postmaster to start the worker. It fails gracefully if invoked when + * autovacuum_workers are already active. + * + * Return value is the OID of the database that the worker is going to process, + * or InvalidOid if no worker was actually started. */ -static void +static Oid do_start_worker(void) { List *dblist; - bool for_xid_wrap; - autovac_dbase *db; - ListCell *cell; + ListCell *cell; TransactionId xidForceLimit; + bool for_xid_wrap; + avw_dbase *avdb; + TimestampTz current_time; + bool skipit = false; + + /* return quickly when there are no free workers */ + LWLockAcquire(AutovacuumLock, LW_SHARED); + if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET) + { + LWLockRelease(AutovacuumLock); + return InvalidOid; + } + LWLockRelease(AutovacuumLock); + + /* use fresh stats */ + pgstat_clear_snapshot(); /* Get a list of databases */ - dblist = autovac_get_database_list(); + dblist = get_database_list(); /* * Determine the oldest datfrozenxid/relfrozenxid that we will allow @@ -495,21 +989,23 @@ do_start_worker(void) * isn't clear how to construct a metric that measures that and not cause * starvation for less busy databases. */ - db = NULL; + avdb = NULL; for_xid_wrap = false; + current_time = GetCurrentTimestamp(); foreach(cell, dblist) { - autovac_dbase *tmp = lfirst(cell); + avw_dbase *tmp = lfirst(cell); + Dlelem *elem; /* Find pgstat entry if any */ - tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid); + tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid); /* Check to see if this one is at risk of wraparound */ - if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit)) + if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit)) { - if (db == NULL || - TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid)) - db = tmp; + if (avdb == NULL || + TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid)) + avdb = tmp; for_xid_wrap = true; continue; } @@ -520,26 +1016,156 @@ do_start_worker(void) * Otherwise, skip a database with no pgstat entry; it means it * hasn't seen any activity. */ - if (!tmp->ad_entry) + if (!tmp->adw_entry) + continue; + + /* + * Also, skip a database that appears on the database list as having + * been processed recently (less than autovacuum_naptime seconds ago). + * We do this so that we don't select a database which we just + * selected, but that pgstat hasn't gotten around to updating the last + * autovacuum time yet. + */ + skipit = false; + elem = DatabaseList ? DLGetTail(DatabaseList) : NULL; + + while (elem != NULL) + { + avl_dbase *dbp = DLE_VAL(elem); + + if (dbp->adl_datid == tmp->adw_datid) + { + TimestampTz curr_plus_naptime; + TimestampTz next = dbp->adl_next_worker; + + curr_plus_naptime = + TimestampTzPlusMilliseconds(current_time, + autovacuum_naptime * 1000); + + /* + * What we want here if to skip if next_worker falls between + * the current time and the current time plus naptime. + */ + if (timestamp_cmp_internal(current_time, next) > 0) + skipit = false; + else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0) + skipit = false; + else + skipit = true; + + break; + } + elem = DLGetPred(elem); + } + if (skipit) continue; /* * Remember the db with oldest autovac time. (If we are here, * both tmp->entry and db->entry must be non-null.) */ - if (db == NULL || - tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time) - db = tmp; + if (avdb == NULL || + tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time) + avdb = tmp; } /* Found a database -- process it */ - if (db != NULL) + if (avdb != NULL) { + WorkerInfo worker; + SHMEM_OFFSET sworker; + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); - AutoVacuumShmem->process_db = db->ad_datid; + + /* + * Get a worker entry from the freelist. We checked above, so there + * really should be a free slot -- complain very loudly if there isn't. + */ + sworker = AutoVacuumShmem->av_freeWorkers; + if (sworker == INVALID_OFFSET) + elog(FATAL, "no free worker found"); + + worker = (WorkerInfo) MAKE_PTR(sworker); + AutoVacuumShmem->av_freeWorkers = worker->wi_links.next; + + worker->wi_dboid = avdb->adw_datid; + worker->wi_workerpid = 0; + worker->wi_launchtime = GetCurrentTimestamp(); + + AutoVacuumShmem->av_startingWorker = sworker; + LWLockRelease(AutovacuumLock); SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER); + + return avdb->adw_datid; + } + else if (skipit) + { + /* + * If we skipped all databases on the list, rebuild it, because it + * probably contains a dropped database. + */ + rebuild_database_list(InvalidOid); + } + + return InvalidOid; +} + +/* + * launch_worker + * + * Wrapper for starting a worker from the launcher. Besides actually starting + * it, update the database list to reflect the next time that another one will + * need to be started on the selected database. The actual database choice is + * left to do_start_worker. + * + * This routine is also expected to insert an entry into the database list if + * the selected database was previously absent from the list. It returns the + * new database list. + */ +static void +launch_worker(TimestampTz now) +{ + Oid dbid; + Dlelem *elem; + + dbid = do_start_worker(); + if (OidIsValid(dbid)) + { + /* + * Walk the database list and update the corresponding entry. If the + * database is not on the list, we'll recreate the list. + */ + elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList); + while (elem != NULL) + { + avl_dbase *avdb = DLE_VAL(elem); + + if (avdb->adl_datid == dbid) + { + /* + * add autovacuum_naptime seconds to the current time, and use + * that as the new "next_worker" field for this database. + */ + avdb->adl_next_worker = + TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000); + + DLMoveToFront(elem); + break; + } + elem = DLGetSucc(elem); + } + + /* + * If the database was not present in the database list, we rebuild the + * list. It's possible that the database does not get into the list + * anyway, for example if it's a database that doesn't have a pgstat + * entry, but this is not a problem because we don't want to schedule + * workers regularly into those in any case. + */ + if (elem == NULL) + rebuild_database_list(dbid); } } @@ -550,6 +1176,13 @@ avl_sighup_handler(SIGNAL_ARGS) got_SIGHUP = true; } +/* SIGUSR1: a worker is up and running, or just finished */ +static void +avl_sigusr1_handler(SIGNAL_ARGS) +{ + got_SIGUSR1 = true; +} + static void avlauncher_shutdown(SIGNAL_ARGS) { @@ -665,7 +1298,7 @@ NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]) { sigjmp_buf local_sigjmp_buf; - Oid dbid; + Oid dbid = InvalidOid; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; @@ -763,18 +1396,35 @@ AutoVacWorkerMain(int argc, char *argv[]) SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE); /* - * Get the database Id we're going to work on, and announce our PID - * in the shared memory area. We remove the database OID immediately - * from the shared memory area. + * Force statement_timeout to zero to avoid a timeout setting from + * preventing regular maintenance from being executed. */ - LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); - dbid = AutoVacuumShmem->process_db; - AutoVacuumShmem->process_db = InvalidOid; - AutoVacuumShmem->worker_pid = MyProcPid; + /* + * Get the info about the database we're going to work on. + */ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker); + dbid = MyWorkerInfo->wi_dboid; + MyWorkerInfo->wi_workerpid = MyProcPid; + /* insert into the running list */ + SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers, + &MyWorkerInfo->wi_links); + /* + * remove from the "starting" pointer, so that the launcher can start a new + * worker if required + */ + AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; LWLockRelease(AutovacuumLock); + on_shmem_exit(FreeWorkerInfo, 0); + + /* wake up the launcher */ + if (AutoVacuumShmem->av_launcherpid != 0) + kill(AutoVacuumShmem->av_launcherpid, SIGUSR1); + if (OidIsValid(dbid)) { char *dbname; @@ -803,7 +1453,7 @@ AutoVacWorkerMain(int argc, char *argv[]) /* Create the memory context where cross-transaction state is stored */ AutovacMemCxt = AllocSetContextCreate(TopMemoryContext, - "Autovacuum context", + "AV worker", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -814,25 +1464,152 @@ AutoVacWorkerMain(int argc, char *argv[]) } /* - * Now remove our PID from shared memory, so that the launcher can start - * another worker as soon as appropriate. + * FIXME -- we need to notify the launcher when we are gone. But this + * should be done after our PGPROC is released, in ProcKill. */ - LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); - AutoVacuumShmem->worker_pid = 0; - LWLockRelease(AutovacuumLock); /* All done, go away */ proc_exit(0); } /* - * autovac_get_database_list + * Return a WorkerInfo to the free list */ +static void +FreeWorkerInfo(int code, Datum arg) +{ + if (MyWorkerInfo != NULL) + { + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + + /* + * If this worker shuts down when there is no free worker slot, wake + * the launcher up so that he can launch a new worker immediately if + * required. We only save the launcher's PID in local memory here -- + * the actual signal will be sent when the PGPROC is recycled, because + * that is when the new worker can actually be launched. + * + * We somewhat ignore the risk that the launcher changes its PID + * between we reading it and the actual kill; we expect ProcKill to be + * called shortly after us, and we assume that PIDs are not reused too + * quickly after a process exits. + */ + if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET) + AutovacuumLauncherPid = AutoVacuumShmem->av_launcherpid; + + SHMQueueDelete(&MyWorkerInfo->wi_links); + MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers; + MyWorkerInfo->wi_dboid = InvalidOid; + MyWorkerInfo->wi_tableoid = InvalidOid; + MyWorkerInfo->wi_workerpid = 0; + MyWorkerInfo->wi_launchtime = 0; + MyWorkerInfo->wi_cost_delay = 0; + MyWorkerInfo->wi_cost_limit = 0; + MyWorkerInfo->wi_cost_limit_base = 0; + AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo); + /* not mine anymore */ + MyWorkerInfo = NULL; + + /* + * now that we're inactive, cause a rebalancing of the surviving + * workers + */ + AutoVacuumShmem->av_rebalance = true; + LWLockRelease(AutovacuumLock); + } +} + +/* + * Update the cost-based delay parameters, so that multiple workers consume + * each a fraction of the total available I/O. + */ +void +AutoVacuumUpdateDelay(void) +{ + if (MyWorkerInfo) + { + VacuumCostDelay = MyWorkerInfo->wi_cost_delay; + VacuumCostLimit = MyWorkerInfo->wi_cost_limit; + } +} + +/* + * autovac_balance_cost + * Recalculate the cost limit setting for each active workers. + * + * Caller must hold the AutovacuumLock in exclusive mode. + */ +static void +autovac_balance_cost(void) +{ + WorkerInfo worker; + int vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ? + autovacuum_vac_cost_limit : VacuumCostLimit); + int vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ? + autovacuum_vac_cost_delay : VacuumCostDelay); + double cost_total; + double cost_avail; + + /* not set? nothing to do */ + if (vac_cost_limit <= 0 || vac_cost_delay <= 0) + return; + + /* caculate the total base cost limit of active workers */ + cost_total = 0.0; + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &AutoVacuumShmem->av_runningWorkers, + offsetof(WorkerInfoData, wi_links)); + while (worker) + { + if (worker->wi_workerpid != 0 && + worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0) + cost_total += + (double) worker->wi_cost_limit_base / worker->wi_cost_delay; + + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &worker->wi_links, + offsetof(WorkerInfoData, wi_links)); + } + /* there are no cost limits -- nothing to do */ + if (cost_total <= 0) + return; + + /* + * Adjust each cost limit of active workers to balance the total of + * cost limit to autovacuum_vacuum_cost_limit. + */ + cost_avail = (double) vac_cost_limit / vac_cost_delay; + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &AutoVacuumShmem->av_runningWorkers, + offsetof(WorkerInfoData, wi_links)); + while (worker) + { + if (worker->wi_workerpid != 0 && + worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0) + { + int limit = (int) + (cost_avail * worker->wi_cost_limit_base / cost_total); + + worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base); + + elog(DEBUG2, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)", + worker->wi_workerpid, worker->wi_dboid, + worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay); + } + + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &worker->wi_links, + offsetof(WorkerInfoData, wi_links)); + } +} + +/* + * get_database_list * * Return a list of all databases. Note we cannot use pg_database, * because we aren't connected; we use the flat database file. */ static List * -autovac_get_database_list(void) +get_database_list(void) { char *filename; List *dblist = NIL; @@ -852,15 +1629,15 @@ autovac_get_database_list(void) while (read_pg_database_line(db_file, thisname, &db_id, &db_tablespace, &db_frozenxid)) { - autovac_dbase *avdb; + avw_dbase *avdb; - avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase)); + avdb = (avw_dbase *) palloc(sizeof(avw_dbase)); - avdb->ad_datid = db_id; - avdb->ad_name = pstrdup(thisname); - avdb->ad_frozenxid = db_frozenxid; + avdb->adw_datid = db_id; + avdb->adw_name = pstrdup(thisname); + avdb->adw_frozenxid = db_frozenxid; /* this gets set later: */ - avdb->ad_entry = NULL; + avdb->adw_entry = NULL; dblist = lappend(dblist, avdb); } @@ -1008,12 +1785,12 @@ do_autovacuum(void) * Add to the list of tables to vacuum, the OIDs of the tables that * correspond to the saved OIDs of toast tables needing vacuum. */ - foreach (cell, toast_oids) + foreach(cell, toast_oids) { Oid toastoid = lfirst_oid(cell); ListCell *cell2; - foreach (cell2, table_toast_list) + foreach(cell2, table_toast_list) { av_relation *ar = lfirst(cell2); @@ -1038,9 +1815,55 @@ do_autovacuum(void) Oid relid = lfirst_oid(cell); autovac_table *tab; char *relname; + WorkerInfo worker; + bool skipit; CHECK_FOR_INTERRUPTS(); + /* + * hold schedule lock from here until we're sure that this table + * still needs vacuuming. We also need the AutovacuumLock to walk + * the worker array, but we'll let go of that one quickly. + */ + LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE); + LWLockAcquire(AutovacuumLock, LW_SHARED); + + /* + * Check whether the table is being vacuumed concurrently by another + * worker. + */ + skipit = false; + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &AutoVacuumShmem->av_runningWorkers, + offsetof(WorkerInfoData, wi_links)); + while (worker) + { + /* ignore myself */ + if (worker == MyWorkerInfo) + goto next_worker; + + /* ignore workers in other databases */ + if (worker->wi_dboid != MyDatabaseId) + goto next_worker; + + if (worker->wi_tableoid == relid) + { + skipit = true; + break; + } + +next_worker: + worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers, + &worker->wi_links, + offsetof(WorkerInfoData, wi_links)); + } + LWLockRelease(AutovacuumLock); + if (skipit) + { + LWLockRelease(AutovacuumScheduleLock); + continue; + } + /* * Check whether pgstat data still says we need to vacuum this table. * It could have changed if something else processed the table while we @@ -1053,11 +1876,18 @@ do_autovacuum(void) if (tab == NULL) { /* someone else vacuumed the table */ + LWLockRelease(AutovacuumScheduleLock); continue; } - /* Ok, good to go! */ - /* Set the vacuum cost parameters for this table */ + /* + * Ok, good to go. Store the table in shared memory before releasing + * the lock so that other workers don't vacuum it concurrently. + */ + MyWorkerInfo->wi_tableoid = relid; + LWLockRelease(AutovacuumScheduleLock); + + /* Set the initial vacuum cost parameters for this table */ VacuumCostDelay = tab->at_vacuum_cost_delay; VacuumCostLimit = tab->at_vacuum_cost_limit; @@ -1067,6 +1897,18 @@ do_autovacuum(void) (tab->at_doanalyze ? " ANALYZE" : ""), relname); + /* + * Advertise my cost delay parameters for the balancing algorithm, and + * do a balance + */ + LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); + MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay; + MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit; + MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit; + autovac_balance_cost(); + LWLockRelease(AutovacuumLock); + + /* have at it */ autovacuum_do_vac_analyze(tab->at_relid, tab->at_dovacuum, tab->at_doanalyze, @@ -1211,7 +2053,7 @@ table_recheck_autovac(Oid relid) PgStat_StatDBEntry *shared; PgStat_StatDBEntry *dbentry; - /* We need fresh pgstat data for this */ + /* use fresh stats */ pgstat_clear_snapshot(); shared = pgstat_fetch_stat_dbentry(InvalidOid); @@ -1219,8 +2061,8 @@ table_recheck_autovac(Oid relid) /* fetch the relation's relcache entry */ classTup = SearchSysCacheCopy(RELOID, - ObjectIdGetDatum(relid), - 0, 0, 0); + ObjectIdGetDatum(relid), + 0, 0, 0); if (!HeapTupleIsValid(classTup)) return NULL; classForm = (Form_pg_class) GETSTRUCT(classTup); @@ -1630,7 +2472,16 @@ IsAutoVacuumWorkerProcess(void) Size AutoVacuumShmemSize(void) { - return sizeof(AutoVacuumShmemStruct); + Size size; + + /* + * Need the fixed struct and the array of WorkerInfoData. + */ + size = sizeof(AutoVacuumShmemStruct); + size = MAXALIGN(size); + size = add_size(size, mul_size(autovacuum_max_workers, + sizeof(WorkerInfoData))); + return size; } /* @@ -1650,8 +2501,29 @@ AutoVacuumShmemInit(void) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("not enough shared memory for autovacuum"))); - if (found) - return; /* already initialized */ - MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct)); + if (!IsUnderPostmaster) + { + WorkerInfo worker; + int i; + + Assert(!found); + + AutoVacuumShmem->av_launcherpid = 0; + AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET; + SHMQueueInit(&AutoVacuumShmem->av_runningWorkers); + AutoVacuumShmem->av_startingWorker = INVALID_OFFSET; + + worker = (WorkerInfo) ((char *) AutoVacuumShmem + + MAXALIGN(sizeof(AutoVacuumShmemStruct))); + + /* initialize the WorkerInfo free list */ + for (i = 0; i < autovacuum_max_workers; i++) + { + worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers; + AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]); + } + } + else + Assert(found); } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 2691a50582..e2b8f22d35 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.187 2007/04/03 16:34:36 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.188 2007/04/16 18:29:53 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -96,7 +96,7 @@ ProcGlobalShmemSize(void) size = add_size(size, sizeof(PROC_HDR)); /* AuxiliaryProcs */ size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC))); - /* MyProcs */ + /* MyProcs, including autovacuum */ size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC))); /* ProcStructLock */ size = add_size(size, sizeof(slock_t)); @@ -110,7 +110,10 @@ ProcGlobalShmemSize(void) int ProcGlobalSemas(void) { - /* We need a sema per backend, plus one for each auxiliary process. */ + /* + * We need a sema per backend (including autovacuum), plus one for each + * auxiliary process. + */ return MaxBackends + NUM_AUXILIARY_PROCS; } @@ -127,8 +130,8 @@ ProcGlobalSemas(void) * running out when trying to start another backend is a common failure. * So, now we grab enough semaphores to support the desired max number * of backends immediately at initialization --- if the sysadmin has set - * MaxBackends higher than his kernel will support, he'll find out sooner - * rather than later. + * MaxConnections or autovacuum_max_workers higher than his kernel will + * support, he'll find out sooner rather than later. * * Another reason for creating semaphores here is that the semaphore * implementation typically requires us to create semaphores in the @@ -163,25 +166,39 @@ InitProcGlobal(void) * Initialize the data structures. */ ProcGlobal->freeProcs = INVALID_OFFSET; + ProcGlobal->autovacFreeProcs = INVALID_OFFSET; ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY; /* * Pre-create the PGPROC structures and create a semaphore for each. */ - procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC)); + procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC)); if (!procs) ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); - MemSet(procs, 0, MaxBackends * sizeof(PGPROC)); - for (i = 0; i < MaxBackends; i++) + MemSet(procs, 0, MaxConnections * sizeof(PGPROC)); + for (i = 0; i < MaxConnections; i++) { PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = ProcGlobal->freeProcs; ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]); } + procs = (PGPROC *) ShmemAlloc((autovacuum_max_workers) * sizeof(PGPROC)); + if (!procs) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"))); + MemSet(procs, 0, autovacuum_max_workers * sizeof(PGPROC)); + for (i = 0; i < autovacuum_max_workers; i++) + { + PGSemaphoreCreate(&(procs[i].sem)); + procs[i].links.next = ProcGlobal->autovacFreeProcs; + ProcGlobal->autovacFreeProcs = MAKE_OFFSET(&procs[i]); + } + MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC)); for (i = 0; i < NUM_AUXILIARY_PROCS; i++) { @@ -226,12 +243,18 @@ InitProcess(void) set_spins_per_delay(procglobal->spins_per_delay); - myOffset = procglobal->freeProcs; + if (IsAutoVacuumWorkerProcess()) + myOffset = procglobal->autovacFreeProcs; + else + myOffset = procglobal->freeProcs; if (myOffset != INVALID_OFFSET) { MyProc = (PGPROC *) MAKE_PTR(myOffset); - procglobal->freeProcs = MyProc->links.next; + if (IsAutoVacuumWorkerProcess()) + procglobal->autovacFreeProcs = MyProc->links.next; + else + procglobal->freeProcs = MyProc->links.next; SpinLockRelease(ProcStructLock); } else @@ -239,7 +262,8 @@ InitProcess(void) /* * If we reach here, all the PGPROCs are in use. This is one of the * possible places to detect "too many backends", so give the standard - * error message. + * error message. XXX do we need to give a different failure message + * in the autovacuum case? */ SpinLockRelease(ProcStructLock); ereport(FATAL, @@ -571,8 +595,16 @@ ProcKill(int code, Datum arg) SpinLockAcquire(ProcStructLock); /* Return PGPROC structure (and semaphore) to freelist */ - MyProc->links.next = procglobal->freeProcs; - procglobal->freeProcs = MAKE_OFFSET(MyProc); + if (IsAutoVacuumWorkerProcess()) + { + MyProc->links.next = procglobal->autovacFreeProcs; + procglobal->autovacFreeProcs = MAKE_OFFSET(MyProc); + } + else + { + MyProc->links.next = procglobal->freeProcs; + procglobal->freeProcs = MAKE_OFFSET(MyProc); + } /* PGPROC struct isn't mine anymore */ MyProc = NULL; @@ -581,6 +613,10 @@ ProcKill(int code, Datum arg) procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay); SpinLockRelease(ProcStructLock); + + /* wake autovac launcher if needed -- see comments in FreeWorkerInfo */ + if (AutovacuumLauncherPid != 0) + kill(AutovacuumLauncherPid, SIGUSR1); } /* diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 130244b5e5..b5b1150056 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.100 2007/01/05 22:19:44 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.101 2007/04/16 18:29:54 alvherre Exp $ * * NOTES * Globals used all over the place should be declared here and not @@ -95,9 +95,14 @@ bool allowSystemTableMods = false; int work_mem = 1024; int maintenance_work_mem = 16384; -/* Primary determinants of sizes of shared-memory structures: */ +/* + * Primary determinants of sizes of shared-memory structures. MaxBackends is + * MaxConnections + autovacuum_max_workers (it is computed by the GUC assign + * hook): + */ int NBuffers = 1000; int MaxBackends = 100; +int MaxConnections = 90; int VacuumCostPageHit = 1; /* GUC parameters for vacuum */ int VacuumCostPageMiss = 10; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 385411c058..83ea00c568 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -10,7 +10,7 @@ * Written by Peter Eisentraut <peter_e@gmx.net>. * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.384 2007/04/12 06:53:47 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.385 2007/04/16 18:29:55 alvherre Exp $ * *-------------------------------------------------------------------- */ @@ -163,6 +163,8 @@ static bool assign_tcp_keepalives_count(int newval, bool doit, GucSource source) static const char *show_tcp_keepalives_idle(void); static const char *show_tcp_keepalives_interval(void); static const char *show_tcp_keepalives_count(void); +static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source); +static bool assign_maxconnections(int newval, bool doit, GucSource source); /* * GUC option variables that are exported from this module @@ -1149,16 +1151,19 @@ static struct config_int ConfigureNamesInt[] = * number. * * MaxBackends is limited to INT_MAX/4 because some places compute - * 4*MaxBackends without any overflow check. Likewise we have to limit - * NBuffers to INT_MAX/2. + * 4*MaxBackends without any overflow check. This check is made on + * assign_maxconnections, since MaxBackends is computed as MaxConnections + + * autovacuum_max_workers. + * + * Likewise we have to limit NBuffers to INT_MAX/2. */ { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), NULL }, - &MaxBackends, - 100, 1, INT_MAX / 4, NULL, NULL + &MaxConnections, + 100, 1, INT_MAX / 4, assign_maxconnections, NULL }, { @@ -1622,6 +1627,15 @@ static struct config_int ConfigureNamesInt[] = &autovacuum_freeze_max_age, 200000000, 100000000, 2000000000, NULL, NULL }, + { + /* see max_connections */ + {"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM, + gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."), + NULL + }, + &autovacuum_max_workers, + 3, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL + }, { {"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER, @@ -6692,5 +6706,32 @@ show_tcp_keepalives_count(void) return nbuf; } +static bool +assign_maxconnections(int newval, bool doit, GucSource source) +{ + if (doit) + { + if (newval + autovacuum_max_workers > INT_MAX / 4) + return false; + + MaxBackends = newval + autovacuum_max_workers; + } + + return true; +} + +static bool +assign_autovacuum_max_workers(int newval, bool doit, GucSource source) +{ + if (doit) + { + if (newval + MaxConnections > INT_MAX / 4) + return false; + + MaxBackends = newval + MaxConnections; + } + + return true; +} #include "guc-file.c" diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 22f9685bbd..bc5b642d02 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -376,6 +376,7 @@ #autovacuum = on # enable autovacuum subprocess? # 'on' requires stats_start_collector # and stats_row_level to also be on +#autovacuum_max_workers = 3 # max # of autovacuum subprocesses #autovacuum_naptime = 1min # time between autovacuum runs #autovacuum_vacuum_threshold = 500 # min # of tuple updates before # vacuum diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index ca5cc799c5..c5090581c5 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -13,7 +13,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.193 2007/03/01 14:52:04 petere Exp $ + * $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.194 2007/04/16 18:29:56 alvherre Exp $ * * NOTES * some of the information in this file should be moved to other files. @@ -129,6 +129,7 @@ extern DLLIMPORT char *DataDir; extern DLLIMPORT int NBuffers; extern int MaxBackends; +extern int MaxConnections; extern DLLIMPORT int MyProcPid; extern DLLIMPORT struct Port *MyProcPort; diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h index facf9de52b..ccd982b681 100644 --- a/src/include/postmaster/autovacuum.h +++ b/src/include/postmaster/autovacuum.h @@ -7,15 +7,18 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/postmaster/autovacuum.h,v 1.8 2007/02/15 23:23:23 alvherre Exp $ + * $PostgreSQL: pgsql/src/include/postmaster/autovacuum.h,v 1.9 2007/04/16 18:30:03 alvherre Exp $ * *------------------------------------------------------------------------- */ #ifndef AUTOVACUUM_H #define AUTOVACUUM_H +#include "storage/lock.h" + /* GUC variables */ extern bool autovacuum_start_daemon; +extern int autovacuum_max_workers; extern int autovacuum_naptime; extern int autovacuum_vac_thresh; extern double autovacuum_vac_scale; @@ -25,6 +28,9 @@ extern int autovacuum_freeze_max_age; extern int autovacuum_vac_cost_delay; extern int autovacuum_vac_cost_limit; +/* autovacuum launcher PID, only valid when worker is shutting down */ +extern int AutovacuumLauncherPid; + /* Status inquiry functions */ extern bool AutoVacuumingActive(void); extern bool IsAutoVacuumLauncherProcess(void); @@ -35,6 +41,9 @@ extern void autovac_init(void); extern int StartAutoVacLauncher(void); extern int StartAutoVacWorker(void); +/* autovacuum cost-delay balancer */ +extern void AutoVacuumUpdateDelay(void); + #ifdef EXEC_BACKEND extern void AutoVacLauncherMain(int argc, char *argv[]); extern void AutoVacWorkerMain(int argc, char *argv[]); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index c47256a159..477284b7d1 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.35 2007/04/03 16:34:36 tgl Exp $ + * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.36 2007/04/16 18:30:04 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -61,6 +61,7 @@ typedef enum LWLockId BtreeVacuumLock, AddinShmemInitLock, AutovacuumLock, + AutovacuumScheduleLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 772cf52cdf..1fd4e264f0 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.97 2007/04/03 16:34:36 tgl Exp $ + * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.98 2007/04/16 18:30:04 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -115,6 +115,8 @@ typedef struct PROC_HDR { /* Head of list of free PGPROC structures */ SHMEM_OFFSET freeProcs; + /* Head of list of autovacuum's free PGPROC structures */ + SHMEM_OFFSET autovacFreeProcs; /* Current shared estimate of appropriate spins_per_delay value */ int spins_per_delay; } PROC_HDR;