*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.40 2007/03/28 22:17:12 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.41 2007/04/16 18:29:52 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
#include "utils/syscache.h"
+static volatile sig_atomic_t got_SIGUSR1 = false;
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t avlauncher_shutdown_request = false;
* GUC parameters
*/
bool autovacuum_start_daemon = false;
+int autovacuum_max_workers;
int autovacuum_naptime;
int autovacuum_vac_thresh;
double autovacuum_vac_scale;
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
-/* Flag to tell if we are in the autovacuum daemon process */
+/* Flags to tell if we are in an autovacuum process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
-/* struct to keep list of candidate databases for vacuum */
-typedef struct autovac_dbase
+/* struct to keep track of databases in launcher */
+typedef struct avl_dbase
{
- Oid ad_datid;
- char *ad_name;
- TransactionId ad_frozenxid;
- PgStat_StatDBEntry *ad_entry;
-} autovac_dbase;
+ Oid adl_datid; /* hash key -- must be first */
+ TimestampTz adl_next_worker;
+ int adl_score;
+} avl_dbase;
+
+/* struct to keep track of databases in worker */
+typedef struct avw_dbase
+{
+ Oid adw_datid;
+ char *adw_name;
+ TransactionId adw_frozenxid;
+ PgStat_StatDBEntry *adw_entry;
+} avw_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
int at_vacuum_cost_limit;
} autovac_table;
+/*-------------
+ * This struct holds information about a single worker's whereabouts. We keep
+ * an array of these in shared memory, sized according to
+ * autovacuum_max_workers.
+ *
+ * wi_links entry into free list or running list
+ * wi_dboid OID of the database this worker is supposed to work on
+ * wi_tableoid OID of the table currently being vacuumed
+ * wi_workerpid PID of the running worker, 0 if not yet started
+ * wi_launchtime Time at which this worker was launched
+ * wi_cost_* Vacuum cost-based delay parameters current in this worker
+ *
+ * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+ * protected by AutovacuumScheduleLock (which is read-only for everyone except
+ * that worker itself).
+ *-------------
+ */
+typedef struct WorkerInfoData
+{
+ SHM_QUEUE wi_links;
+ Oid wi_dboid;
+ Oid wi_tableoid;
+ int wi_workerpid;
+ TimestampTz wi_launchtime;
+ int wi_cost_delay;
+ int wi_cost_limit;
+ int wi_cost_limit_base;
+} WorkerInfoData;
+
+typedef struct WorkerInfoData *WorkerInfo;
+
+/*-------------
+ * The main autovacuum shmem struct. On shared memory we store this main
+ * struct and the array of WorkerInfo structs. This struct keeps:
+ *
+ * av_launcherpid the PID of the autovacuum launcher
+ * av_freeWorkers the WorkerInfo freelist
+ * av_runningWorkers the WorkerInfo non-free queue
+ * av_startingWorker pointer to WorkerInfo currently being started (cleared by
+ * the worker itself as soon as it's up and running)
+ * av_rebalance true when a worker determines that cost limits must be
+ * rebalanced
+ *
+ * This struct is protected by AutovacuumLock.
+ *-------------
+ */
typedef struct
{
- Oid process_db; /* OID of database to process */
- int worker_pid; /* PID of the worker process, if any */
+ pid_t av_launcherpid;
+ SHMEM_OFFSET av_freeWorkers;
+ SHM_QUEUE av_runningWorkers;
+ SHMEM_OFFSET av_startingWorker;
+ bool av_rebalance;
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
+/* the database list in the launcher, and the context that contains it */
+static Dllist *DatabaseList = NULL;
+static MemoryContext DatabaseListCxt = NULL;
+
+/* Pointer to my own WorkerInfo, valid on each worker */
+static WorkerInfo MyWorkerInfo = NULL;
+
+/* PID of launcher, valid only in worker while shutting down */
+int AutovacuumLauncherPid = 0;
+
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
-static void do_start_worker(void);
+static Oid do_start_worker(void);
+static uint64 launcher_determine_sleep(bool canlaunch, bool recursing);
+static void launch_worker(TimestampTz now);
+static List *get_database_list(void);
+static void rebuild_database_list(Oid newdb);
+static int db_comparator(const void *a, const void *b);
+static void autovac_balance_cost(void);
+
static void do_autovacuum(void);
-static List *autovac_get_database_list(void);
+static void FreeWorkerInfo(int code, Datum arg);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
+static void avl_sigusr1_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
/*
* Main loop for the autovacuum launcher process.
+ *
+ * The signalling between launcher and worker is as follows:
+ *
+ * When the worker has finished starting up, it stores its PID in wi_workerpid
+ * and sends a SIGUSR1 signal to the launcher. The launcher then knows that
+ * the postmaster is ready to start a new worker. We do it this way because
+ * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+ * yet processed the last one, in which case the second signal would be lost.
+ * This is only useful when two workers need to be started close to one
+ * another, which should be rare but it's possible.
+ *
+ * When a worker exits, it resets the WorkerInfo struct and puts it back into
+ * the free list. If there is no free worker slot, it will also signal the
+ * launcher, which then wakes up and can launch a new worker if it needs to.
+ * Note that we only need to do it when there's no free worker slot, because
+ * otherwise there is no need -- the launcher would be awakened normally per
+ * schedule.
+ *
+ * There is a potential problem if, for some reason, a worker starts and is not
+ * able to bootstrap itself correctly. To prevent this situation from starving
+ * the whole system, the launcher checks the launch time of the "starting
+ * worker". If it's too old (older than autovacuum_naptime seconds), it resets
+ * the worker entry and puts it back into the free list.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
- MemoryContext avlauncher_cxt;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
* Set up signal handlers. Since this is an auxiliary process, it has
* particular signal requirements -- no deadlock checker or sinval
* catchup, for example.
- *
- * XXX It may be a good idea to receive signals when an avworker process
- * finishes.
*/
pqsignal(SIGHUP, avl_sighup_handler);
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, SIG_IGN);
+ pqsignal(SIGUSR1, avl_sigusr1_handler);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
- avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
- "Autovacuum Launcher",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- MemoryContextSwitchTo(avlauncher_cxt);
+ AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
+ "Autovacuum Launcher",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ MemoryContextSwitchTo(AutovacMemCxt);
/*
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
- MemoryContextSwitchTo(avlauncher_cxt);
+ MemoryContextSwitchTo(AutovacMemCxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
- MemoryContextResetAndDeleteChildren(avlauncher_cxt);
+ MemoryContextResetAndDeleteChildren(AutovacMemCxt);
+
+ /* don't leave dangling pointers to freed memory */
+ DatabaseListCxt = NULL;
+ DatabaseList = NULL;
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
ereport(LOG,
(errmsg("autovacuum launcher started")));
+ /* must unblock signals before calling rebuild_database_list */
PG_SETMASK(&UnBlockSig);
+ /* in emergency mode, just start a worker and go away */
+ if (!autovacuum_start_daemon)
+ {
+ do_start_worker();
+ proc_exit(0); /* done */
+ }
+
+ AutoVacuumShmem->av_launcherpid = MyProcPid;
+
/*
- * take a nap before executing the first iteration, unless we were
- * requested an emergency run.
+ * Create the initial database list. The invariant we want this list to
+ * keep is that it's ordered by decreasing next_time. As soon as an entry
+ * is updated to a higher time, it will be moved to the front (which is
+ * correct because the only operation is to add autovacuum_naptime to the
+ * entry, and time always increases).
*/
- if (autovacuum_start_daemon)
- pg_usleep(autovacuum_naptime * 1000000L);
+ rebuild_database_list(InvalidOid);
for (;;)
{
- int worker_pid;
+ uint64 micros;
+ bool can_launch;
+ TimestampTz current_time = 0;
/*
* Emergency bailout if postmaster has died. This is to avoid the
if (!PostmasterIsAlive(true))
exit(1);
+ micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
+ INVALID_OFFSET, false);
+
+ /* Sleep for a while according to schedule */
+ pg_usleep(micros);
+
+ /* the normal shutdown case */
if (avlauncher_shutdown_request)
break;
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ /* rebalance in case the default cost parameters changed */
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+
+ /* rebuild the list in case the naptime changed */
+ rebuild_database_list(InvalidOid);
+ }
+
+ /* a worker started up or finished */
+ if (got_SIGUSR1)
+ {
+ got_SIGUSR1 = false;
+
+ /* rebalance cost limits, if needed */
+ if (AutoVacuumShmem->av_rebalance)
+ {
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ AutoVacuumShmem->av_rebalance = false;
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+ }
}
/*
- * if there's a worker already running, sleep until it
- * disappears.
+ * There are some conditions that we need to check before trying to
+ * start a launcher. First, we need to make sure that there is a
+ * launcher slot available. Second, we need to make sure that no other
+ * worker is still starting up.
*/
+
LWLockAcquire(AutovacuumLock, LW_SHARED);
- worker_pid = AutoVacuumShmem->worker_pid;
- LWLockRelease(AutovacuumLock);
- if (worker_pid != 0)
- {
- PGPROC *proc = BackendPidGetProc(worker_pid);
+ can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
- if (proc != NULL && proc->isAutovacuum)
- goto sleep;
+ if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
+ {
+ long secs;
+ int usecs;
+ WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
+
+ if (current_time == 0)
+ current_time = GetCurrentTimestamp();
+
+ /*
+ * We can't launch another worker when another one is still
+ * starting up, so just sleep for a bit more; that worker will wake
+ * us up again as soon as it's ready. We will only wait
+ * autovacuum_naptime seconds for this to happen however. Note
+ * that failure to connect to a particular database is not a
+ * problem here, because the worker removes itself from the
+ * startingWorker pointer before trying to connect; only low-level
+ * problems, like fork() failure, can get us here.
+ */
+ TimestampDifference(worker->wi_launchtime, current_time,
+ &secs, &usecs);
+
+ /* ignore microseconds, as they cannot make any difference */
+ if (secs > autovacuum_naptime)
+ {
+ LWLockRelease(AutovacuumLock);
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ /*
+ * No other process can put a worker in starting mode, so if
+ * startingWorker is still INVALID after exchanging our lock,
+ * we assume it's the same one we saw above (so we don't
+ * recheck the launch time).
+ */
+ if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
+ {
+ worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
+ worker->wi_dboid = InvalidOid;
+ worker->wi_tableoid = InvalidOid;
+ worker->wi_workerpid = 0;
+ worker->wi_launchtime = 0;
+ worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
+ AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
+ AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
+ }
+ }
else
{
/*
- * if the worker is not really running (or it's a process
- * that's not an autovacuum worker), remove the PID from shmem.
- * This should not happen, because either the worker exits
- * cleanly, in which case it'll remove the PID, or it dies, in
- * which case postmaster will cause a system reset cycle.
+ * maybe the postmaster neglected this start signal --
+ * resend it. Note: the constraints in
+ * launcher_determine_sleep keep us from delivering signals too
+ * quickly (at most once every 100ms).
*/
- LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
- worker_pid = 0;
- LWLockRelease(AutovacuumLock);
+ SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+ can_launch = false;
}
}
+ LWLockRelease(AutovacuumLock); /* either shared or exclusive */
- do_start_worker();
+ if (can_launch)
+ {
+ Dlelem *elem;
-sleep:
- /*
- * in emergency mode, exit immediately so that the postmaster can
- * request another run right away if needed.
- *
- * XXX -- maybe it would be better to handle this inside the launcher
- * itself.
- */
- if (!autovacuum_start_daemon)
- break;
+ elem = DLGetTail(DatabaseList);
- /* have pgstat read the file again next time */
- pgstat_clear_snapshot();
+ if (current_time == 0)
+ current_time = GetCurrentTimestamp();
+
+ if (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ long secs;
+ int usecs;
- /* now sleep until the next autovac iteration */
- pg_usleep(autovacuum_naptime * 1000000L);
+ TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
+
+ /* do we have to start a worker? */
+ if (secs <= 0 && usecs <= 0)
+ launch_worker(current_time);
+ }
+ else
+ {
+ /*
+ * Special case when the list is empty: start a worker right
+ * away. This covers the initial case, when no database is in
+ * pgstats (thus the list is empty). Note that the constraints
+ * in launcher_determine_sleep keep us from starting workers
+ * too quickly (at most once every autovacuum_naptime when the
+ * list is empty).
+ */
+ launch_worker(current_time);
+ }
+ }
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
+ AutoVacuumShmem->av_launcherpid = 0;
proc_exit(0); /* done */
}
+/*
+ * Determine the time to sleep, in microseconds, based on the database list.
+ *
+ * The "canlaunch" parameter indicates whether we can start a worker right now,
+ * for example due to the workers being all busy.
+ */
+static uint64
+launcher_determine_sleep(bool canlaunch, bool recursing)
+{
+ long secs;
+ int usecs;
+ Dlelem *elem;
+
+ /*
+ * We sleep until the next scheduled vacuum. We trust that when the
+ * database list was built, care was taken so that no entries have times in
+ * the past; if the first entry has too close a next_worker value, or a
+ * time in the past, we will sleep a small nominal time.
+ */
+ if (!canlaunch)
+ {
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+ else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ TimestampTz current_time = GetCurrentTimestamp();
+ TimestampTz next_wakeup;
+
+ next_wakeup = avdb->adl_next_worker;
+ TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ }
+ else
+ {
+ /* list is empty, sleep for whole autovacuum_naptime seconds */
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+
+ /*
+ * If the result is exactly zero, it means a database had an entry with
+ * time in the past. Rebuild the list so that the databases are evenly
+ * distributed again, and recalculate the time to sleep. This can happen
+ * if there are more tables needing vacuum than workers, and they all take
+ * longer to vacuum than autovacuum_naptime.
+ *
+ * We only recurse once. rebuild_database_list should always return times
+ * in the future, but it seems best not to trust too much on that.
+ */
+ if (secs == 0L && usecs == 0 && !recursing)
+ {
+ rebuild_database_list(InvalidOid);
+ return launcher_determine_sleep(canlaunch, true);
+ }
+
+ /* 100ms is the smallest time we'll allow the launcher to sleep */
+ if (secs <= 0L && usecs <= 100000)
+ {
+ secs = 0L;
+ usecs = 100000; /* 100 ms */
+ }
+
+ return secs * 1000000 + usecs;
+}
+
+/*
+ * Build an updated DatabaseList. It must only contain databases that appear
+ * in pgstats, and must be sorted by next_worker from highest to lowest,
+ * distributed regularly across the next autovacuum_naptime interval.
+ *
+ * Receives the Oid of the database that made this list be generated (we call
+ * this the "new" database, because when the database was already present on
+ * the list, we expect that this function is not called at all). The
+ * preexisting list, if any, will be used to preserve the order of the
+ * databases in the autovacuum_naptime period. The new database is put at the
+ * end of the interval. The actual values are not saved, which should not be
+ * much of a problem.
+ */
+static void
+rebuild_database_list(Oid newdb)
+{
+ List *dblist;
+ ListCell *cell;
+ MemoryContext newcxt;
+ MemoryContext oldcxt;
+ MemoryContext tmpcxt;
+ HASHCTL hctl;
+ int score;
+ int nelems;
+ HTAB *dbhash;
+
+ /* use fresh stats */
+ pgstat_clear_snapshot();
+
+ newcxt = AllocSetContextCreate(AutovacMemCxt,
+ "AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ tmpcxt = AllocSetContextCreate(newcxt,
+ "tmp AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(tmpcxt);
+
+ /*
+ * Implementing this is not as simple as it sounds, because we need to put
+ * the new database at the end of the list; next the databases that were
+ * already on the list, and finally (at the tail of the list) all the other
+ * databases that are not on the existing list.
+ *
+ * To do this, we build an empty hash table of scored databases. We will
+ * start with the lowest score (zero) for the new database, then increasing
+ * scores for the databases in the existing list, in order, and lastly
+ * increasing scores for all databases gotten via get_database_list() that
+ * are not already on the hash.
+ *
+ * Then we will put all the hash elements into an array, sort the array by
+ * score, and finally put the array elements into the new doubly linked
+ * list.
+ */
+ hctl.keysize = sizeof(Oid);
+ hctl.entrysize = sizeof(avl_dbase);
+ hctl.hash = oid_hash;
+ hctl.hcxt = tmpcxt;
+ dbhash = hash_create("db hash", 20, &hctl, /* magic number here FIXME */
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ /* start by inserting the new database */
+ score = 0;
+ if (OidIsValid(newdb))
+ {
+ avl_dbase *db;
+ PgStat_StatDBEntry *entry;
+
+ /* only consider this database if it has a pgstat entry */
+ entry = pgstat_fetch_stat_dbentry(newdb);
+ if (entry != NULL)
+ {
+ /* we assume it isn't found because the hash was just created */
+ db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
+
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+
+ /* Now insert the databases from the existing list */
+ if (DatabaseList != NULL)
+ {
+ Dlelem *elem;
+
+ elem = DLGetHead(DatabaseList);
+ while (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ avl_dbase *db;
+ bool found;
+ PgStat_StatDBEntry *entry;
+
+ elem = DLGetSucc(elem);
+
+ /*
+ * skip databases with no stat entries -- in particular, this
+ * gets rid of dropped databases
+ */
+ entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+ if (entry == NULL)
+ continue;
+
+ db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
+
+ if (!found)
+ {
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+ }
+
+ /* finally, insert all qualifying databases not previously inserted */
+ dblist = get_database_list();
+ foreach(cell, dblist)
+ {
+ avw_dbase *avdb = lfirst(cell);
+ avl_dbase *db;
+ bool found;
+ PgStat_StatDBEntry *entry;
+
+ /* only consider databases with a pgstat entry */
+ entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+ if (entry == NULL)
+ continue;
+
+ db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
+ /* only update the score if the database was not already on the hash */
+ if (!found)
+ {
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+ nelems = score;
+
+ /* from here on, the allocated memory belongs to the new list */
+ MemoryContextSwitchTo(newcxt);
+ DatabaseList = DLNewList();
+
+ if (nelems > 0)
+ {
+ TimestampTz current_time;
+ int millis_increment;
+ avl_dbase *dbary;
+ avl_dbase *db;
+ HASH_SEQ_STATUS seq;
+ int i;
+
+ /* put all the hash elements into an array */
+ dbary = palloc(nelems * sizeof(avl_dbase));
+
+ i = 0;
+ hash_seq_init(&seq, dbhash);
+ while ((db = hash_seq_search(&seq)) != NULL)
+ memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
+
+ /* sort the array */
+ qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
+
+ /* this is the time interval between databases in the schedule */
+ millis_increment = 1000.0 * autovacuum_naptime / nelems;
+ current_time = GetCurrentTimestamp();
+
+ /*
+ * move the elements from the array into the dllist, setting the
+ * next_worker while walking the array
+ */
+ for (i = 0; i < nelems; i++)
+ {
+ avl_dbase *db = &(dbary[i]);
+ Dlelem *elem;
+
+ current_time = TimestampTzPlusMilliseconds(current_time,
+ millis_increment);
+ db->adl_next_worker = current_time;
+
+ elem = DLNewElem(db);
+ /* later elements should go closer to the head of the list */
+ DLAddHead(DatabaseList, elem);
+ }
+ }
+
+ /* all done, clean up memory */
+ if (DatabaseListCxt != NULL)
+ MemoryContextDelete(DatabaseListCxt);
+ MemoryContextDelete(tmpcxt);
+ DatabaseListCxt = newcxt;
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/* qsort comparator for avl_dbase, using adl_score */
+static int
+db_comparator(const void *a, const void *b)
+{
+ if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
+ return 0;
+ else
+ return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
+}
+
/*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
- * signals postmaster to start the worker.
+ * signals postmaster to start the worker. It fails gracefully if invoked when
+ * autovacuum_workers are already active.
+ *
+ * Return value is the OID of the database that the worker is going to process,
+ * or InvalidOid if no worker was actually started.
*/
-static void
+static Oid
do_start_worker(void)
{
List *dblist;
- bool for_xid_wrap;
- autovac_dbase *db;
- ListCell *cell;
+ ListCell *cell;
TransactionId xidForceLimit;
+ bool for_xid_wrap;
+ avw_dbase *avdb;
+ TimestampTz current_time;
+ bool skipit = false;
+
+ /* return quickly when there are no free workers */
+ LWLockAcquire(AutovacuumLock, LW_SHARED);
+ if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
+ {
+ LWLockRelease(AutovacuumLock);
+ return InvalidOid;
+ }
+ LWLockRelease(AutovacuumLock);
+
+ /* use fresh stats */
+ pgstat_clear_snapshot();
/* Get a list of databases */
- dblist = autovac_get_database_list();
+ dblist = get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
* isn't clear how to construct a metric that measures that and not cause
* starvation for less busy databases.
*/
- db = NULL;
+ avdb = NULL;
for_xid_wrap = false;
+ current_time = GetCurrentTimestamp();
foreach(cell, dblist)
{
- autovac_dbase *tmp = lfirst(cell);
+ avw_dbase *tmp = lfirst(cell);
+ Dlelem *elem;
/* Find pgstat entry if any */
- tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
+ tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
/* Check to see if this one is at risk of wraparound */
- if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
+ if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
{
- if (db == NULL ||
- TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
- db = tmp;
+ if (avdb == NULL ||
+ TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
+ avdb = tmp;
for_xid_wrap = true;
continue;
}
* Otherwise, skip a database with no pgstat entry; it means it
* hasn't seen any activity.
*/
- if (!tmp->ad_entry)
+ if (!tmp->adw_entry)
+ continue;
+
+ /*
+ * Also, skip a database that appears on the database list as having
+ * been processed recently (less than autovacuum_naptime seconds ago).
+ * We do this so that we don't select a database which we just
+ * selected, but that pgstat hasn't gotten around to updating the last
+ * autovacuum time yet.
+ */
+ skipit = false;
+ elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
+
+ while (elem != NULL)
+ {
+ avl_dbase *dbp = DLE_VAL(elem);
+
+ if (dbp->adl_datid == tmp->adw_datid)
+ {
+ TimestampTz curr_plus_naptime;
+ TimestampTz next = dbp->adl_next_worker;
+
+ curr_plus_naptime =
+ TimestampTzPlusMilliseconds(current_time,
+ autovacuum_naptime * 1000);
+
+ /*
+ * What we want here if to skip if next_worker falls between
+ * the current time and the current time plus naptime.
+ */
+ if (timestamp_cmp_internal(current_time, next) > 0)
+ skipit = false;
+ else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
+ skipit = false;
+ else
+ skipit = true;
+
+ break;
+ }
+ elem = DLGetPred(elem);
+ }
+ if (skipit)
continue;
/*
* Remember the db with oldest autovac time. (If we are here,
* both tmp->entry and db->entry must be non-null.)
*/
- if (db == NULL ||
- tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
- db = tmp;
+ if (avdb == NULL ||
+ tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
+ avdb = tmp;
}
/* Found a database -- process it */
- if (db != NULL)
+ if (avdb != NULL)
{
+ WorkerInfo worker;
+ SHMEM_OFFSET sworker;
+
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
- AutoVacuumShmem->process_db = db->ad_datid;
+
+ /*
+ * Get a worker entry from the freelist. We checked above, so there
+ * really should be a free slot -- complain very loudly if there isn't.
+ */
+ sworker = AutoVacuumShmem->av_freeWorkers;
+ if (sworker == INVALID_OFFSET)
+ elog(FATAL, "no free worker found");
+
+ worker = (WorkerInfo) MAKE_PTR(sworker);
+ AutoVacuumShmem->av_freeWorkers = worker->wi_links.next;
+
+ worker->wi_dboid = avdb->adw_datid;
+ worker->wi_workerpid = 0;
+ worker->wi_launchtime = GetCurrentTimestamp();
+
+ AutoVacuumShmem->av_startingWorker = sworker;
+
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+
+ return avdb->adw_datid;
+ }
+ else if (skipit)
+ {
+ /*
+ * If we skipped all databases on the list, rebuild it, because it
+ * probably contains a dropped database.
+ */
+ rebuild_database_list(InvalidOid);
+ }
+
+ return InvalidOid;
+}
+
+/*
+ * launch_worker
+ *
+ * Wrapper for starting a worker from the launcher. Besides actually starting
+ * it, update the database list to reflect the next time that another one will
+ * need to be started on the selected database. The actual database choice is
+ * left to do_start_worker.
+ *
+ * This routine is also expected to insert an entry into the database list if
+ * the selected database was previously absent from the list. It returns the
+ * new database list.
+ */
+static void
+launch_worker(TimestampTz now)
+{
+ Oid dbid;
+ Dlelem *elem;
+
+ dbid = do_start_worker();
+ if (OidIsValid(dbid))
+ {
+ /*
+ * Walk the database list and update the corresponding entry. If the
+ * database is not on the list, we'll recreate the list.
+ */
+ elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList);
+ while (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+
+ if (avdb->adl_datid == dbid)
+ {
+ /*
+ * add autovacuum_naptime seconds to the current time, and use
+ * that as the new "next_worker" field for this database.
+ */
+ avdb->adl_next_worker =
+ TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+
+ DLMoveToFront(elem);
+ break;
+ }
+ elem = DLGetSucc(elem);
+ }
+
+ /*
+ * If the database was not present in the database list, we rebuild the
+ * list. It's possible that the database does not get into the list
+ * anyway, for example if it's a database that doesn't have a pgstat
+ * entry, but this is not a problem because we don't want to schedule
+ * workers regularly into those in any case.
+ */
+ if (elem == NULL)
+ rebuild_database_list(dbid);
}
}
got_SIGHUP = true;
}
+/* SIGUSR1: a worker is up and running, or just finished */
+static void
+avl_sigusr1_handler(SIGNAL_ARGS)
+{
+ got_SIGUSR1 = true;
+}
+
static void
avlauncher_shutdown(SIGNAL_ARGS)
{
AutoVacWorkerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
- Oid dbid;
+ Oid dbid = InvalidOid;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
/*
- * Get the database Id we're going to work on, and announce our PID
- * in the shared memory area. We remove the database OID immediately
- * from the shared memory area.
+ * Force statement_timeout to zero to avoid a timeout setting from
+ * preventing regular maintenance from being executed.
*/
- LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
- dbid = AutoVacuumShmem->process_db;
- AutoVacuumShmem->process_db = InvalidOid;
- AutoVacuumShmem->worker_pid = MyProcPid;
+ /*
+ * Get the info about the database we're going to work on.
+ */
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
+ dbid = MyWorkerInfo->wi_dboid;
+ MyWorkerInfo->wi_workerpid = MyProcPid;
+ /* insert into the running list */
+ SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers,
+ &MyWorkerInfo->wi_links);
+ /*
+ * remove from the "starting" pointer, so that the launcher can start a new
+ * worker if required
+ */
+ AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
LWLockRelease(AutovacuumLock);
+ on_shmem_exit(FreeWorkerInfo, 0);
+
+ /* wake up the launcher */
+ if (AutoVacuumShmem->av_launcherpid != 0)
+ kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
+
if (OidIsValid(dbid))
{
char *dbname;
/* Create the memory context where cross-transaction state is stored */
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
- "Autovacuum context",
+ "AV worker",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
}
/*
- * Now remove our PID from shared memory, so that the launcher can start
- * another worker as soon as appropriate.
+ * FIXME -- we need to notify the launcher when we are gone. But this
+ * should be done after our PGPROC is released, in ProcKill.
*/
- LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
- AutoVacuumShmem->worker_pid = 0;
- LWLockRelease(AutovacuumLock);
/* All done, go away */
proc_exit(0);
}
/*
- * autovac_get_database_list
+ * Return a WorkerInfo to the free list */
+static void
+FreeWorkerInfo(int code, Datum arg)
+{
+ if (MyWorkerInfo != NULL)
+ {
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ /*
+ * If this worker shuts down when there is no free worker slot, wake
+ * the launcher up so that he can launch a new worker immediately if
+ * required. We only save the launcher's PID in local memory here --
+ * the actual signal will be sent when the PGPROC is recycled, because
+ * that is when the new worker can actually be launched.
+ *
+ * We somewhat ignore the risk that the launcher changes its PID
+ * between we reading it and the actual kill; we expect ProcKill to be
+ * called shortly after us, and we assume that PIDs are not reused too
+ * quickly after a process exits.
+ */
+ if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
+ AutovacuumLauncherPid = AutoVacuumShmem->av_launcherpid;
+
+ SHMQueueDelete(&MyWorkerInfo->wi_links);
+ MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers;
+ MyWorkerInfo->wi_dboid = InvalidOid;
+ MyWorkerInfo->wi_tableoid = InvalidOid;
+ MyWorkerInfo->wi_workerpid = 0;
+ MyWorkerInfo->wi_launchtime = 0;
+ MyWorkerInfo->wi_cost_delay = 0;
+ MyWorkerInfo->wi_cost_limit = 0;
+ MyWorkerInfo->wi_cost_limit_base = 0;
+ AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo);
+ /* not mine anymore */
+ MyWorkerInfo = NULL;
+
+ /*
+ * now that we're inactive, cause a rebalancing of the surviving
+ * workers
+ */
+ AutoVacuumShmem->av_rebalance = true;
+ LWLockRelease(AutovacuumLock);
+ }
+}
+
+/*
+ * Update the cost-based delay parameters, so that multiple workers consume
+ * each a fraction of the total available I/O.
+ */
+void
+AutoVacuumUpdateDelay(void)
+{
+ if (MyWorkerInfo)
+ {
+ VacuumCostDelay = MyWorkerInfo->wi_cost_delay;
+ VacuumCostLimit = MyWorkerInfo->wi_cost_limit;
+ }
+}
+
+/*
+ * autovac_balance_cost
+ * Recalculate the cost limit setting for each active workers.
+ *
+ * Caller must hold the AutovacuumLock in exclusive mode.
+ */
+static void
+autovac_balance_cost(void)
+{
+ WorkerInfo worker;
+ int vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ?
+ autovacuum_vac_cost_limit : VacuumCostLimit);
+ int vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
+ autovacuum_vac_cost_delay : VacuumCostDelay);
+ double cost_total;
+ double cost_avail;
+
+ /* not set? nothing to do */
+ if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
+ return;
+
+ /* caculate the total base cost limit of active workers */
+ cost_total = 0.0;
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &AutoVacuumShmem->av_runningWorkers,
+ offsetof(WorkerInfoData, wi_links));
+ while (worker)
+ {
+ if (worker->wi_workerpid != 0 &&
+ worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
+ cost_total +=
+ (double) worker->wi_cost_limit_base / worker->wi_cost_delay;
+
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &worker->wi_links,
+ offsetof(WorkerInfoData, wi_links));
+ }
+ /* there are no cost limits -- nothing to do */
+ if (cost_total <= 0)
+ return;
+
+ /*
+ * Adjust each cost limit of active workers to balance the total of
+ * cost limit to autovacuum_vacuum_cost_limit.
+ */
+ cost_avail = (double) vac_cost_limit / vac_cost_delay;
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &AutoVacuumShmem->av_runningWorkers,
+ offsetof(WorkerInfoData, wi_links));
+ while (worker)
+ {
+ if (worker->wi_workerpid != 0 &&
+ worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
+ {
+ int limit = (int)
+ (cost_avail * worker->wi_cost_limit_base / cost_total);
+
+ worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base);
+
+ elog(DEBUG2, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
+ worker->wi_workerpid, worker->wi_dboid,
+ worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
+ }
+
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &worker->wi_links,
+ offsetof(WorkerInfoData, wi_links));
+ }
+}
+
+/*
+ * get_database_list
*
* Return a list of all databases. Note we cannot use pg_database,
* because we aren't connected; we use the flat database file.
*/
static List *
-autovac_get_database_list(void)
+get_database_list(void)
{
char *filename;
List *dblist = NIL;
while (read_pg_database_line(db_file, thisname, &db_id,
&db_tablespace, &db_frozenxid))
{
- autovac_dbase *avdb;
+ avw_dbase *avdb;
- avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
+ avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
- avdb->ad_datid = db_id;
- avdb->ad_name = pstrdup(thisname);
- avdb->ad_frozenxid = db_frozenxid;
+ avdb->adw_datid = db_id;
+ avdb->adw_name = pstrdup(thisname);
+ avdb->adw_frozenxid = db_frozenxid;
/* this gets set later: */
- avdb->ad_entry = NULL;
+ avdb->adw_entry = NULL;
dblist = lappend(dblist, avdb);
}
* Add to the list of tables to vacuum, the OIDs of the tables that
* correspond to the saved OIDs of toast tables needing vacuum.
*/
- foreach (cell, toast_oids)
+ foreach(cell, toast_oids)
{
Oid toastoid = lfirst_oid(cell);
ListCell *cell2;
- foreach (cell2, table_toast_list)
+ foreach(cell2, table_toast_list)
{
av_relation *ar = lfirst(cell2);
Oid relid = lfirst_oid(cell);
autovac_table *tab;
char *relname;
+ WorkerInfo worker;
+ bool skipit;
CHECK_FOR_INTERRUPTS();
+ /*
+ * hold schedule lock from here until we're sure that this table
+ * still needs vacuuming. We also need the AutovacuumLock to walk
+ * the worker array, but we'll let go of that one quickly.
+ */
+ LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ LWLockAcquire(AutovacuumLock, LW_SHARED);
+
+ /*
+ * Check whether the table is being vacuumed concurrently by another
+ * worker.
+ */
+ skipit = false;
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &AutoVacuumShmem->av_runningWorkers,
+ offsetof(WorkerInfoData, wi_links));
+ while (worker)
+ {
+ /* ignore myself */
+ if (worker == MyWorkerInfo)
+ goto next_worker;
+
+ /* ignore workers in other databases */
+ if (worker->wi_dboid != MyDatabaseId)
+ goto next_worker;
+
+ if (worker->wi_tableoid == relid)
+ {
+ skipit = true;
+ break;
+ }
+
+next_worker:
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &worker->wi_links,
+ offsetof(WorkerInfoData, wi_links));
+ }
+ LWLockRelease(AutovacuumLock);
+ if (skipit)
+ {
+ LWLockRelease(AutovacuumScheduleLock);
+ continue;
+ }
+
/*
* Check whether pgstat data still says we need to vacuum this table.
* It could have changed if something else processed the table while we
if (tab == NULL)
{
/* someone else vacuumed the table */
+ LWLockRelease(AutovacuumScheduleLock);
continue;
}
- /* Ok, good to go! */
- /* Set the vacuum cost parameters for this table */
+ /*
+ * Ok, good to go. Store the table in shared memory before releasing
+ * the lock so that other workers don't vacuum it concurrently.
+ */
+ MyWorkerInfo->wi_tableoid = relid;
+ LWLockRelease(AutovacuumScheduleLock);
+
+ /* Set the initial vacuum cost parameters for this table */
VacuumCostDelay = tab->at_vacuum_cost_delay;
VacuumCostLimit = tab->at_vacuum_cost_limit;
(tab->at_doanalyze ? " ANALYZE" : ""),
relname);
+ /*
+ * Advertise my cost delay parameters for the balancing algorithm, and
+ * do a balance
+ */
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay;
+ MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit;
+ MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit;
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+
+ /* have at it */
autovacuum_do_vac_analyze(tab->at_relid,
tab->at_dovacuum,
tab->at_doanalyze,
PgStat_StatDBEntry *shared;
PgStat_StatDBEntry *dbentry;
- /* We need fresh pgstat data for this */
+ /* use fresh stats */
pgstat_clear_snapshot();
shared = pgstat_fetch_stat_dbentry(InvalidOid);
/* fetch the relation's relcache entry */
classTup = SearchSysCacheCopy(RELOID,
- ObjectIdGetDatum(relid),
- 0, 0, 0);
+ ObjectIdGetDatum(relid),
+ 0, 0, 0);
if (!HeapTupleIsValid(classTup))
return NULL;
classForm = (Form_pg_class) GETSTRUCT(classTup);
Size
AutoVacuumShmemSize(void)
{
- return sizeof(AutoVacuumShmemStruct);
+ Size size;
+
+ /*
+ * Need the fixed struct and the array of WorkerInfoData.
+ */
+ size = sizeof(AutoVacuumShmemStruct);
+ size = MAXALIGN(size);
+ size = add_size(size, mul_size(autovacuum_max_workers,
+ sizeof(WorkerInfoData)));
+ return size;
}
/*
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for autovacuum")));
- if (found)
- return; /* already initialized */
- MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
+ if (!IsUnderPostmaster)
+ {
+ WorkerInfo worker;
+ int i;
+
+ Assert(!found);
+
+ AutoVacuumShmem->av_launcherpid = 0;
+ AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET;
+ SHMQueueInit(&AutoVacuumShmem->av_runningWorkers);
+ AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
+
+ worker = (WorkerInfo) ((char *) AutoVacuumShmem +
+ MAXALIGN(sizeof(AutoVacuumShmemStruct)));
+
+ /* initialize the WorkerInfo free list */
+ for (i = 0; i < autovacuum_max_workers; i++)
+ {
+ worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers;
+ AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]);
+ }
+ }
+ else
+ Assert(found);
}