* The postgres vacuum cleaner.
*
* This file includes the "full" version of VACUUM, as well as control code
- * used by all three of full VACUUM, lazy VACUUM, and ANALYZE. See
+ * used by all three of full VACUUM, lazy VACUUM, and ANALYZE. See
* vacuumlazy.c and analyze.c for the rest of the code for the latter two.
*
*
- * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.205 2001/07/15 22:48:17 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.260 2003/09/24 18:54:01 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include <unistd.h>
+#include "access/clog.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
#include "catalog/catname.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_database.h"
#include "catalog/pg_index.h"
#include "commands/vacuum.h"
#include "executor/executor.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
+#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
-#include "utils/temprel.h"
-
#include "pgstat.h"
-typedef struct VRelListData
-{
- Oid vrl_relid;
- struct VRelListData *vrl_next;
-} VRelListData;
-
-typedef VRelListData *VRelList;
-
typedef struct VacPageData
{
BlockNumber blkno; /* BlockNumber of this Page */
typedef struct VacPageListData
{
- BlockNumber empty_end_pages; /* Number of "empty" end-pages */
- int num_pages; /* Number of pages in pagedesc */
+ BlockNumber empty_end_pages; /* Number of "empty" end-pages */
+ int num_pages; /* Number of pages in pagedesc */
int num_allocated_pages; /* Number of allocated pages in
* pagedesc */
- VacPage *pagedesc; /* Descriptions of pages */
+ VacPage *pagedesc; /* Descriptions of pages */
} VacPageListData;
typedef VacPageListData *VacPageList;
typedef struct VRelStats
{
- BlockNumber rel_pages;
+ BlockNumber rel_pages;
double rel_tuples;
Size min_tlen;
Size max_tlen;
static MemoryContext vac_context = NULL;
-static int MESSAGE_LEVEL; /* message level */
+static int elevel = -1;
-static TransactionId XmaxRecent;
+static TransactionId OldestXmin;
+static TransactionId FreezeLimit;
/* non-export function prototypes */
-static void vacuum_init(void);
-static void vacuum_shutdown(void);
-static VRelList getrels(Name VacRelP, const char *stmttype);
-static void vacuum_rel(Oid relid, VacuumStmt *vacstmt);
-static void full_vacuum_rel(Relation onerel);
+static List *getrels(const RangeVar *vacrel, const char *stmttype);
+static void vac_update_dbstats(Oid dbid,
+ TransactionId vacuumXID,
+ TransactionId frozenXID);
+static void vac_truncate_clog(TransactionId vacuumXID,
+ TransactionId frozenXID);
+static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
+static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
static void scan_heap(VRelStats *vacrelstats, Relation onerel,
- VacPageList vacuum_pages, VacPageList fraged_pages);
+ VacPageList vacuum_pages, VacPageList fraged_pages);
static void repair_frag(VRelStats *vacrelstats, Relation onerel,
- VacPageList vacuum_pages, VacPageList fraged_pages,
- int nindexes, Relation *Irel);
+ VacPageList vacuum_pages, VacPageList fraged_pages,
+ int nindexes, Relation *Irel);
static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
- VacPageList vacpagelist);
+ VacPageList vacpagelist);
static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
static void vacuum_index(VacPageList vacpagelist, Relation indrel,
- double num_tuples, int keep_tuples);
+ double num_tuples, int keep_tuples);
static void scan_index(Relation indrel, double num_tuples);
static bool tid_reaped(ItemPointer itemptr, void *state);
+static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
- BlockNumber rel_pages);
+ BlockNumber rel_pages);
static VacPage copy_vac_page(VacPage vacpage);
static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
-static bool is_partial_index(Relation indrel);
static void *vac_bsearch(const void *key, const void *base,
- size_t nelem, size_t size,
- int (*compar) (const void *, const void *));
+ size_t nelem, size_t size,
+ int (*compar) (const void *, const void *));
static int vac_cmp_blk(const void *left, const void *right);
static int vac_cmp_offno(const void *left, const void *right);
static int vac_cmp_vtlinks(const void *left, const void *right);
vacuum(VacuumStmt *vacstmt)
{
const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
- NameData VacRel;
- Name VacRelName;
- VRelList vrl,
- cur;
+ MemoryContext anl_context = NULL;
+ TransactionId initialOldestXmin = InvalidTransactionId;
+ TransactionId initialFreezeLimit = InvalidTransactionId;
+ bool all_rels;
+ List *vrl,
+ *cur;
+
+ if (vacstmt->verbose)
+ elevel = INFO;
+ else
+ elevel = DEBUG2;
/*
* We cannot run VACUUM inside a user transaction block; if we were
* user's transaction too, which would certainly not be the desired
* behavior.
*/
- if (IsTransactionBlock())
- elog(ERROR, "%s cannot run inside a BEGIN/END block", stmttype);
+ if (vacstmt->vacuum)
+ PreventTransactionChain((void *) vacstmt, stmttype);
/*
* Send info about dead objects to the statistics collector
*/
- pgstat_vacuum_tabstat();
-
- if (vacstmt->verbose)
- MESSAGE_LEVEL = NOTICE;
- else
- MESSAGE_LEVEL = DEBUG;
+ if (vacstmt->vacuum)
+ pgstat_vacuum_tabstat();
/*
* Create special memory context for cross-transaction storage.
*
- * Since it is a child of QueryContext, it will go away eventually even
+ * Since it is a child of PortalContext, it will go away eventually even
* if we suffer an error; there's no need for special abort cleanup
* logic.
*/
- vac_context = AllocSetContextCreate(QueryContext,
+ vac_context = AllocSetContextCreate(PortalContext,
"Vacuum",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
- /* Convert vacrel, which is just a string, to a Name */
- if (vacstmt->vacrel)
- {
- namestrcpy(&VacRel, vacstmt->vacrel);
- VacRelName = &VacRel;
- }
- else
- VacRelName = NULL;
+ /*
+ * If we are running only ANALYZE, we don't need per-table
+ * transactions, but we still need a memory context with table
+ * lifetime.
+ */
+ if (vacstmt->analyze && !vacstmt->vacuum)
+ anl_context = AllocSetContextCreate(PortalContext,
+ "Analyze",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* Assume we are processing everything unless one table is mentioned */
+ all_rels = (vacstmt->relation == NULL);
/* Build list of relations to process (note this lives in vac_context) */
- vrl = getrels(VacRelName, stmttype);
+ vrl = getrels(vacstmt->relation, stmttype);
/*
- * Start up the vacuum cleaner.
+ * Formerly, there was code here to prevent more than one VACUUM from
+ * executing concurrently in the same database. However, there's no
+ * good reason to prevent that, and manually removing lockfiles after
+ * a vacuum crash was a pain for dbadmins. So, forget about
+ * lockfiles, and just rely on the locks we grab on each target table
+ * to ensure that there aren't two VACUUMs running on the same table
+ * at the same time.
*/
- vacuum_init();
/*
- * Process each selected relation. We are careful to process
- * each relation in a separate transaction in order to avoid holding
- * too many locks at one time. Also, if we are doing VACUUM ANALYZE,
- * the ANALYZE part runs as a separate transaction from the VACUUM
- * to further reduce locking.
+ * The strangeness with committing and starting transactions here is
+ * due to wanting to run each table's VACUUM as a separate
+ * transaction, so that we don't hold locks unnecessarily long. Also,
+ * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
+ * transaction from the VACUUM to further reduce locking.
+ *
+ * vacuum_rel expects to be entered with no transaction active; it will
+ * start and commit its own transaction. But we are called by an SQL
+ * command, and so we are executing inside a transaction already. We
+ * commit the transaction started in PostgresMain() here, and start
+ * another one before exiting to match the commit waiting for us back
+ * in PostgresMain().
+ *
+ * In the case of an ANALYZE statement (no vacuum, just analyze) it's
+ * okay to run the whole thing in the outer transaction, and so we
+ * skip transaction start/stop operations.
*/
- for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
+ if (vacstmt->vacuum)
{
+ if (all_rels)
+ {
+ /*
+ * It's a database-wide VACUUM.
+ *
+ * Compute the initially applicable OldestXmin and FreezeLimit
+ * XIDs, so that we can record these values at the end of the
+ * VACUUM. Note that individual tables may well be processed
+ * with newer values, but we can guarantee that no
+ * (non-shared) relations are processed with older ones.
+ *
+ * It is okay to record non-shared values in pg_database, even
+ * though we may vacuum shared relations with older cutoffs,
+ * because only the minimum of the values present in
+ * pg_database matters. We can be sure that shared relations
+ * have at some time been vacuumed with cutoffs no worse than
+ * the global minimum; for, if there is a backend in some
+ * other DB with xmin = OLDXMIN that's determining the cutoff
+ * with which we vacuum shared relations, it is not possible
+ * for that database to have a cutoff newer than OLDXMIN
+ * recorded in pg_database.
+ */
+ vacuum_set_xid_limits(vacstmt, false,
+ &initialOldestXmin,
+ &initialFreezeLimit);
+ }
+
+ /* matches the StartTransaction in PostgresMain() */
+ CommitTransactionCommand();
+ }
+
+ /*
+ * Loop to process each selected relation.
+ */
+ foreach(cur, vrl)
+ {
+ Oid relid = lfirsto(cur);
+
if (vacstmt->vacuum)
- vacuum_rel(cur->vrl_relid, vacstmt);
+ {
+ if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
+ all_rels = false; /* forget about updating dbstats */
+ }
if (vacstmt->analyze)
- analyze_rel(cur->vrl_relid, vacstmt);
+ {
+ MemoryContext old_context = NULL;
+
+ /*
+ * If we vacuumed, use new transaction for analyze. Otherwise,
+ * we can use the outer transaction, but we still need to call
+ * analyze_rel in a memory context that will be cleaned up on
+ * return (else we leak memory while processing multiple
+ * tables).
+ */
+ if (vacstmt->vacuum)
+ {
+ StartTransactionCommand();
+ SetQuerySnapshot(); /* might be needed for functions
+ * in indexes */
+ }
+ else
+ old_context = MemoryContextSwitchTo(anl_context);
+
+ analyze_rel(relid, vacstmt);
+
+ if (vacstmt->vacuum)
+ CommitTransactionCommand();
+ else
+ {
+ MemoryContextSwitchTo(old_context);
+ MemoryContextResetAndDeleteChildren(anl_context);
+ }
+ }
}
/*
- * If we did a complete vacuum, then flush the init file that relcache.c
- * uses to save startup time. The next backend startup will rebuild the
- * init file with up-to-date information from pg_class. This lets the
- * optimizer see the stats that we've collected for certain critical
- * system indexes. See relcache.c for more details.
- *
- * Ignore any failure to unlink the file, since it might not be there if
- * no backend has been started since the last vacuum.
+ * Finish up processing.
*/
- if (vacstmt->vacrel == NULL)
- unlink(RELCACHE_INIT_FILENAME);
-
- /* clean up */
- vacuum_shutdown();
-}
+ if (vacstmt->vacuum)
+ {
+ /* here, we are not in a transaction */
-/*
- * vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
- *
- * Formerly, there was code here to prevent more than one VACUUM from
- * executing concurrently in the same database. However, there's no
- * good reason to prevent that, and manually removing lockfiles after
- * a vacuum crash was a pain for dbadmins. So, forget about lockfiles,
- * and just rely on the locks we grab on each target table
- * to ensure that there aren't two VACUUMs running on the same table
- * at the same time.
- *
- * The strangeness with committing and starting transactions in the
- * init and shutdown routines is due to the fact that the vacuum cleaner
- * is invoked via an SQL command, and so is already executing inside
- * a transaction. We need to leave ourselves in a predictable state
- * on entry and exit to the vacuum cleaner. We commit the transaction
- * started in PostgresMain() inside vacuum_init(), and start one in
- * vacuum_shutdown() to match the commit waiting for us back in
- * PostgresMain().
- */
-static void
-vacuum_init(void)
-{
- /* matches the StartTransaction in PostgresMain() */
- CommitTransactionCommand();
-}
+ /*
+ * This matches the CommitTransaction waiting for us in
+ * PostgresMain().
+ */
+ StartTransactionCommand();
-static void
-vacuum_shutdown(void)
-{
- /* on entry, we are not in a transaction */
+ /*
+ * If it was a database-wide VACUUM, print FSM usage statistics
+ * (we don't make you be superuser to see these).
+ */
+ if (vacstmt->relation == NULL)
+ PrintFreeSpaceMapStatistics(elevel);
- /* matches the CommitTransaction in PostgresMain() */
- StartTransactionCommand();
+ /*
+ * If we completed a database-wide VACUUM without skipping any
+ * relations, update the database's pg_database row with info
+ * about the transaction IDs used, and try to truncate pg_clog.
+ */
+ if (all_rels)
+ {
+ vac_update_dbstats(MyDatabaseId,
+ initialOldestXmin, initialFreezeLimit);
+ vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
+ }
+ }
/*
* Clean up working storage --- note we must do this after
*/
MemoryContextDelete(vac_context);
vac_context = NULL;
+
+ if (anl_context)
+ MemoryContextDelete(anl_context);
}
/*
- * Build a list of VRelListData nodes for each relation to be processed
+ * Build a list of Oids for each relation to be processed
*
* The list is built in vac_context so that it will survive across our
* per-relation transactions.
*/
-static VRelList
-getrels(Name VacRelP, const char *stmttype)
+static List *
+getrels(const RangeVar *vacrel, const char *stmttype)
{
- Relation rel;
- TupleDesc tupdesc;
- HeapScanDesc scan;
- HeapTuple tuple;
- VRelList vrl,
- cur;
- Datum d;
- char *rname;
- char rkind;
- bool n;
- ScanKeyData key;
-
- if (VacRelP)
+ List *vrl = NIL;
+ MemoryContext oldcontext;
+
+ if (vacrel)
{
- /*
- * we could use the cache here, but it is clearer to use scankeys
- * for both vacuum cases, bjm 2000/01/19
- */
- char *nontemp_relname;
+ /* Process specific relation */
+ Oid relid;
- /* We must re-map temp table names bjm 2000-04-06 */
- nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP));
- if (nontemp_relname == NULL)
- nontemp_relname = NameStr(*VacRelP);
+ relid = RangeVarGetRelid(vacrel, false);
- ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
- F_NAMEEQ,
- PointerGetDatum(nontemp_relname));
+ /* Make a relation list entry for this guy */
+ oldcontext = MemoryContextSwitchTo(vac_context);
+ vrl = lappendo(vrl, relid);
+ MemoryContextSwitchTo(oldcontext);
}
else
{
- /* find all plain relations listed in pg_class */
- ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
- F_CHAREQ, CharGetDatum(RELKIND_RELATION));
- }
-
- vrl = cur = (VRelList) NULL;
+ /* Process all plain relations listed in pg_class */
+ Relation pgclass;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ ScanKeyData key;
- rel = heap_openr(RelationRelationName, AccessShareLock);
- tupdesc = RelationGetDescr(rel);
+ ScanKeyEntryInitialize(&key, 0x0,
+ Anum_pg_class_relkind,
+ F_CHAREQ,
+ CharGetDatum(RELKIND_RELATION));
- scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
+ pgclass = heap_openr(RelationRelationName, AccessShareLock);
- while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
- {
- d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
- rname = (char *) DatumGetName(d);
-
- d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
- rkind = DatumGetChar(d);
+ scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
- if (rkind != RELKIND_RELATION)
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
- elog(NOTICE, "%s: can not process indexes, views or special system tables",
- stmttype);
- continue;
+ /* Make a relation list entry for this guy */
+ oldcontext = MemoryContextSwitchTo(vac_context);
+ vrl = lappendo(vrl, HeapTupleGetOid(tuple));
+ MemoryContextSwitchTo(oldcontext);
}
- /* Make a relation list entry for this guy */
- if (vrl == (VRelList) NULL)
- vrl = cur = (VRelList)
- MemoryContextAlloc(vac_context, sizeof(VRelListData));
- else
- {
- cur->vrl_next = (VRelList)
- MemoryContextAlloc(vac_context, sizeof(VRelListData));
- cur = cur->vrl_next;
- }
+ heap_endscan(scan);
+ heap_close(pgclass, AccessShareLock);
+ }
+
+ return vrl;
+}
+
+/*
+ * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
+ */
+void
+vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
+ TransactionId *oldestXmin,
+ TransactionId *freezeLimit)
+{
+ TransactionId limit;
+
+ *oldestXmin = GetOldestXmin(sharedRel);
- cur->vrl_relid = tuple->t_data->t_oid;
- cur->vrl_next = (VRelList) NULL;
+ Assert(TransactionIdIsNormal(*oldestXmin));
+
+ if (vacstmt->freeze)
+ {
+ /* FREEZE option: use oldest Xmin as freeze cutoff too */
+ limit = *oldestXmin;
+ }
+ else
+ {
+ /*
+ * Normal case: freeze cutoff is well in the past, to wit, about
+ * halfway to the wrap horizon
+ */
+ limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
}
- heap_endscan(scan);
- heap_close(rel, AccessShareLock);
+ /*
+ * Be careful not to generate a "permanent" XID
+ */
+ if (!TransactionIdIsNormal(limit))
+ limit = FirstNormalTransactionId;
- if (vrl == NULL)
- elog(NOTICE, "%s: table not found", stmttype);
+ /*
+ * Ensure sane relationship of limits
+ */
+ if (TransactionIdFollows(limit, *oldestXmin))
+ {
+ ereport(WARNING,
+ (errmsg("oldest Xmin is far in the past"),
+ errhint("Close open transactions soon to avoid wraparound problems.")));
+ limit = *oldestXmin;
+ }
- return vrl;
+ *freezeLimit = limit;
}
/* get the buffer cache tuple */
rtup.t_self = ctup->t_self;
ReleaseSysCache(ctup);
- heap_fetch(rd, SnapshotNow, &rtup, &buffer, NULL);
+ if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
+ elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+ relid);
/* overwrite the existing statistics in the tuple */
pgcform = (Form_pg_class) GETSTRUCT(&rtup);
pgcform->reltuples = num_tuples;
pgcform->relhasindex = hasindex;
- /* invalidate the tuple in the cache and write the buffer */
- RelationInvalidateHeapTuple(rd, &rtup);
+ /*
+ * If we have discovered that there are no indexes, then there's no
+ * primary key either. This could be done more thoroughly...
+ */
+ if (!hasindex)
+ pgcform->relhaspkey = false;
+
+ /*
+ * Invalidate the tuple in the catcaches; this also arranges to flush
+ * the relation's relcache entry. (If we fail to commit for some
+ * reason, no flush will occur, but no great harm is done since there
+ * are no noncritical state updates here.)
+ */
+ CacheInvalidateHeapTuple(rd, &rtup);
+
+ /* Write the buffer */
WriteBuffer(buffer);
heap_close(rd, RowExclusiveLock);
}
+/*
+ * vac_update_dbstats() -- update statistics for one database
+ *
+ * Update the whole-database statistics that are kept in its pg_database
+ * row.
+ *
+ * We violate no-overwrite semantics here by storing new values for the
+ * statistics columns directly into the tuple that's already on the page.
+ * As with vac_update_relstats, this avoids leaving dead tuples behind
+ * after a VACUUM; which is good since GetRawDatabaseInfo
+ * can get confused by finding dead tuples in pg_database.
+ *
+ * This routine is shared by full and lazy VACUUM. Note that it is only
+ * applied after a database-wide VACUUM operation.
+ */
+static void
+vac_update_dbstats(Oid dbid,
+ TransactionId vacuumXID,
+ TransactionId frozenXID)
+{
+ Relation relation;
+ ScanKeyData entry[1];
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ Form_pg_database dbform;
+
+ relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
+
+ /* Must use a heap scan, since there's no syscache for pg_database */
+ ScanKeyEntryInitialize(&entry[0], 0x0,
+ ObjectIdAttributeNumber, F_OIDEQ,
+ ObjectIdGetDatum(dbid));
+
+ scan = heap_beginscan(relation, SnapshotNow, 1, entry);
+
+ tuple = heap_getnext(scan, ForwardScanDirection);
+
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "could not find tuple for database %u", dbid);
+
+ dbform = (Form_pg_database) GETSTRUCT(tuple);
+
+ /* overwrite the existing statistics in the tuple */
+ dbform->datvacuumxid = vacuumXID;
+ dbform->datfrozenxid = frozenXID;
+
+ /* invalidate the tuple in the cache and write the buffer */
+ CacheInvalidateHeapTuple(relation, tuple);
+ WriteNoReleaseBuffer(scan->rs_cbuf);
+
+ heap_endscan(scan);
+
+ heap_close(relation, RowExclusiveLock);
+}
+
+
+/*
+ * vac_truncate_clog() -- attempt to truncate the commit log
+ *
+ * Scan pg_database to determine the system-wide oldest datvacuumxid,
+ * and use it to truncate the transaction commit log (pg_clog).
+ * Also generate a warning if the system-wide oldest datfrozenxid
+ * seems to be in danger of wrapping around.
+ *
+ * The passed XIDs are simply the ones I just wrote into my pg_database
+ * entry. They're used to initialize the "min" calculations.
+ *
+ * This routine is shared by full and lazy VACUUM. Note that it is only
+ * applied after a database-wide VACUUM operation.
+ */
+static void
+vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
+{
+ TransactionId myXID;
+ Relation relation;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ int32 age;
+ bool vacuumAlreadyWrapped = false;
+ bool frozenAlreadyWrapped = false;
+
+ myXID = GetCurrentTransactionId();
+
+ relation = heap_openr(DatabaseRelationName, AccessShareLock);
+
+ scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
+
+ /* Ignore non-connectable databases (eg, template0) */
+ /* It's assumed that these have been frozen correctly */
+ if (!dbform->datallowconn)
+ continue;
+
+ if (TransactionIdIsNormal(dbform->datvacuumxid))
+ {
+ if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
+ vacuumAlreadyWrapped = true;
+ else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
+ vacuumXID = dbform->datvacuumxid;
+ }
+ if (TransactionIdIsNormal(dbform->datfrozenxid))
+ {
+ if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
+ frozenAlreadyWrapped = true;
+ else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
+ frozenXID = dbform->datfrozenxid;
+ }
+ }
+
+ heap_endscan(scan);
+
+ heap_close(relation, AccessShareLock);
+
+ /*
+ * Do not truncate CLOG if we seem to have suffered wraparound
+ * already; the computed minimum XID might be bogus.
+ */
+ if (vacuumAlreadyWrapped)
+ {
+ ereport(WARNING,
+ (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
+ errdetail("You may have already suffered transaction-wraparound data loss.")));
+ return;
+ }
+
+ /* Truncate CLOG to the oldest vacuumxid */
+ TruncateCLOG(vacuumXID);
+
+ /* Give warning about impending wraparound problems */
+ if (frozenAlreadyWrapped)
+ {
+ ereport(WARNING,
+ (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
+ errhint("Better vacuum them soon, or you may have a wraparound failure.")));
+ }
+ else
+ {
+ age = (int32) (myXID - frozenXID);
+ if (age > (int32) ((MaxTransactionId >> 3) * 3))
+ ereport(WARNING,
+ (errmsg("some databases have not been vacuumed in %d transactions",
+ age),
+ errhint("Better vacuum them within %d transactions, "
+ "or you may have a wraparound failure.",
+ (int32) (MaxTransactionId >> 1) - age)));
+ }
+}
+
+
/****************************************************************************
* *
* Code common to both flavors of VACUUM *
/*
* vacuum_rel() -- vacuum one heap relation
*
+ * Returns TRUE if we actually processed the relation (or can ignore it
+ * for some reason), FALSE if we failed to process it due to permissions
+ * or other reasons. (A FALSE result really means that some data
+ * may have been left unvacuumed, so we can't update XID stats.)
+ *
* Doing one heap at a time incurs extra overhead, since we need to
* check that the heap exists again just before we vacuum it. The
* reason that we do this is so that vacuuming can be spread across
*
* At entry and exit, we are not inside a transaction.
*/
-static void
-vacuum_rel(Oid relid, VacuumStmt *vacstmt)
+static bool
+vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
{
LOCKMODE lmode;
Relation onerel;
LockRelId onerelid;
Oid toast_relid;
+ bool result;
/* Begin a transaction for vacuuming this relation */
StartTransactionCommand();
+ SetQuerySnapshot(); /* might be needed for functions in
+ * indexes */
/*
* Check for user-requested abort. Note we want this to be inside a
- * transaction, so xact.c doesn't issue useless NOTICE.
+ * transaction, so xact.c doesn't issue useless WARNING.
*/
CHECK_FOR_INTERRUPTS();
0, 0, 0))
{
CommitTransactionCommand();
- return;
+ return true; /* okay 'cause no data there */
}
/*
* Determine the type of lock we want --- hard exclusive lock for a
* FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
- * vacuum. Either way, we can be sure that no other backend is vacuuming
- * the same table.
+ * vacuum. Either way, we can be sure that no other backend is
+ * vacuuming the same table.
*/
lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
/*
- * Open the class, get an appropriate lock on it, and check permissions.
+ * Open the class, get an appropriate lock on it, and check
+ * permissions.
*
* We allow the user to vacuum a table if he is superuser, the table
* owner, or the database owner (but in the latter case, only if it's
- * not a shared relation). pg_ownercheck includes the superuser case.
+ * not a shared relation). pg_class_ownercheck includes the superuser
+ * case.
*
- * Note we choose to treat permissions failure as a NOTICE and keep
+ * Note we choose to treat permissions failure as a WARNING and keep
* trying to vacuum the rest of the DB --- is this appropriate?
*/
- onerel = heap_open(relid, lmode);
+ onerel = relation_open(relid, lmode);
- if (! (pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
- RELNAME) ||
- (is_dbadmin(MyDatabaseId) && !onerel->rd_rel->relisshared)))
+ if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
+ (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
{
- elog(NOTICE, "Skipping \"%s\" --- only table or database owner can VACUUM it",
- RelationGetRelationName(onerel));
- heap_close(onerel, lmode);
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- only table or database owner can VACUUM it",
+ RelationGetRelationName(onerel))));
+ relation_close(onerel, lmode);
CommitTransactionCommand();
- return;
+ return false;
+ }
+
+ /*
+ * Check that it's a plain table; we used to do this in getrels() but
+ * seems safer to check after we've locked the relation.
+ */
+ if (onerel->rd_rel->relkind != expected_relkind)
+ {
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- cannot VACUUM indexes, views or special system tables",
+ RelationGetRelationName(onerel))));
+ relation_close(onerel, lmode);
+ CommitTransactionCommand();
+ return false;
+ }
+
+ /*
+ * Silently ignore tables that are temp tables of other backends ---
+ * trying to vacuum these will lead to great unhappiness, since their
+ * contents are probably not up-to-date on disk. (We don't throw a
+ * warning here; it would just lead to chatter during a database-wide
+ * VACUUM.)
+ */
+ if (isOtherTempNamespace(RelationGetNamespace(onerel)))
+ {
+ relation_close(onerel, lmode);
+ CommitTransactionCommand();
+ return true; /* assume no long-lived data in temp
+ * tables */
}
/*
* Do the actual work --- either FULL or "lazy" vacuum
*/
if (vacstmt->full)
- full_vacuum_rel(onerel);
+ full_vacuum_rel(onerel, vacstmt);
else
lazy_vacuum_rel(onerel, vacstmt);
+ result = true; /* did the vacuum */
+
/* all done with this class, but hold lock until commit */
- heap_close(onerel, NoLock);
+ relation_close(onerel, NoLock);
/*
* Complete the transaction and free all temporary memory used.
/*
* If the relation has a secondary toast rel, vacuum that too while we
* still hold the session lock on the master table. Note however that
- * "analyze" will not get done on the toast table. This is good,
- * because the toaster always uses hardcoded index access and statistics
- * are totally unimportant for toast relations.
+ * "analyze" will not get done on the toast table. This is good,
+ * because the toaster always uses hardcoded index access and
+ * statistics are totally unimportant for toast relations.
*/
if (toast_relid != InvalidOid)
- vacuum_rel(toast_relid, vacstmt);
+ {
+ if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
+ result = false; /* failed to vacuum the TOAST table? */
+ }
/*
* Now release the session-level lock on the master table.
*/
UnlockRelationForSession(&onerelid, lmode);
+
+ return result;
}
* and locked the relation.
*/
static void
-full_vacuum_rel(Relation onerel)
+full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
{
VacPageListData vacuum_pages; /* List of pages to vacuum and/or
* clean indexes */
int nindexes,
i;
VRelStats *vacrelstats;
- bool reindex = false;
-
- if (IsIgnoringSystemIndexes() &&
- IsSystemRelationName(RelationGetRelationName(onerel)))
- reindex = true;
- GetXmaxRecent(&XmaxRecent);
+ vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
+ &OldestXmin, &FreezeLimit);
/*
* Set up statistics-gathering machinery.
/* Now open all indexes of the relation */
vac_open_indexes(onerel, &nindexes, &Irel);
- if (!Irel)
- reindex = false;
- else if (!RelationGetForm(onerel)->relhasindex)
- reindex = true;
if (nindexes > 0)
vacrelstats->hasindex = true;
-#ifdef NOT_USED
- /*
- * reindex in VACUUM is dangerous under WAL. ifdef out until it
- * becomes safe.
- */
- if (reindex)
- {
- vac_close_indexes(nindexes, Irel);
- Irel = (Relation *) NULL;
- activate_indexes_of_a_table(RelationGetRelid(onerel), false);
- }
-#endif /* NOT_USED */
-
/* Clean/scan index relation(s) */
if (Irel != (Relation *) NULL)
{
*/
i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
if (i < 0)
- elog(ERROR, "VACUUM (full_vacuum_rel): FlushRelationBuffers returned %d",
- i);
+ elog(ERROR, "FlushRelationBuffers returned %d", i);
}
}
-#ifdef NOT_USED
- if (reindex)
- activate_indexes_of_a_table(RelationGetRelid(onerel), true);
-#endif /* NOT_USED */
-
/* update shared free space map with final free space info */
vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
char *relname;
VacPage vacpage,
vacpagecopy;
- BlockNumber empty_pages,
- new_pages,
- changed_pages,
+ BlockNumber empty_pages,
empty_end_pages;
double num_tuples,
tups_vacuumed,
nkeep,
nunused;
- double free_size,
- usable_free_size;
+ double free_space,
+ usable_free_space;
Size min_tlen = MaxTupleSize;
Size max_tlen = 0;
int i;
vac_init_rusage(&ru0);
relname = RelationGetRelationName(onerel);
- elog(MESSAGE_LEVEL, "--Relation %s--", relname);
+ ereport(elevel,
+ (errmsg("vacuuming \"%s.%s\"",
+ get_namespace_name(RelationGetNamespace(onerel)),
+ relname)));
- empty_pages = new_pages = changed_pages = empty_end_pages = 0;
+ empty_pages = empty_end_pages = 0;
num_tuples = tups_vacuumed = nkeep = nunused = 0;
- free_size = 0;
+ free_space = 0;
nblocks = RelationGetNumberOfBlocks(onerel);
bool do_reap,
do_frag;
+ CHECK_FOR_INTERRUPTS();
+
buf = ReadBuffer(onerel, blkno);
page = BufferGetPage(buf);
if (PageIsNew(page))
{
- elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
- relname, blkno);
+ ereport(WARNING,
+ (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
+ relname, blkno)));
PageInit(page, BufferGetPageSize(buf), 0);
vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
- free_size += (vacpage->free - sizeof(ItemIdData));
- new_pages++;
+ free_space += vacpage->free;
+ empty_pages++;
empty_end_pages++;
vacpagecopy = copy_vac_page(vacpage);
vpage_insert(vacuum_pages, vacpagecopy);
if (PageIsEmpty(page))
{
vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
- free_size += (vacpage->free - sizeof(ItemIdData));
+ free_space += vacpage->free;
empty_pages++;
empty_end_pages++;
vacpagecopy = copy_vac_page(vacpage);
tupgone = false;
sv_infomask = tuple.t_data->t_infomask;
- switch (HeapTupleSatisfiesVacuum(tuple.t_data, XmaxRecent))
+ switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
{
case HEAPTUPLE_DEAD:
- tupgone = true; /* we can delete the tuple */
+ tupgone = true; /* we can delete the tuple */
break;
case HEAPTUPLE_LIVE:
+
+ /*
+ * Tuple is good. Consider whether to replace its
+ * xmin value with FrozenTransactionId.
+ */
+ if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
+ TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+ FreezeLimit))
+ {
+ HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
+ /* infomask should be okay already */
+ Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
+ pgchanged = true;
+ }
break;
case HEAPTUPLE_RECENTLY_DEAD:
+
/*
- * If tuple is recently deleted then we must not remove
- * it from relation.
+ * If tuple is recently deleted then we must not
+ * remove it from relation.
*/
nkeep += 1;
+
/*
* If we do shrinking and this tuple is updated one
* then remember it to construct updated tuple
}
break;
case HEAPTUPLE_INSERT_IN_PROGRESS:
+
/*
- * This should not happen, since we hold exclusive lock
- * on the relation; shouldn't we raise an error?
+ * This should not happen, since we hold exclusive
+ * lock on the relation; shouldn't we raise an error?
+ * (Actually, it can happen in system catalogs, since
+ * we tend to release write lock before commit there.)
*/
- elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
- relname, blkno, offnum, tuple.t_data->t_xmin);
+ ereport(NOTICE,
+ (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
+ relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
do_shrinking = false;
break;
case HEAPTUPLE_DELETE_IN_PROGRESS:
+
/*
- * This should not happen, since we hold exclusive lock
- * on the relation; shouldn't we raise an error?
+ * This should not happen, since we hold exclusive
+ * lock on the relation; shouldn't we raise an error?
+ * (Actually, it can happen in system catalogs, since
+ * we tend to release write lock before commit there.)
*/
- elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
- relname, blkno, offnum, tuple.t_data->t_xmax);
+ ereport(NOTICE,
+ (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
+ relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
do_shrinking = false;
break;
default:
- elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
+ elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
break;
}
/*
* Other checks...
*/
- if (!OidIsValid(tuple.t_data->t_oid))
- elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
- relname, blkno, offnum, (int) tupgone);
+ if (onerel->rd_rel->relhasoids &&
+ !OidIsValid(HeapTupleGetOid(&tuple)))
+ elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
+ relname, blkno, offnum);
if (tupgone)
{
if (tuple.t_len > max_tlen)
max_tlen = tuple.t_len;
}
- } /* scan along page */
+ } /* scan along page */
if (tempPage != (Page) NULL)
{
do_reap = (vacpage->offsets_free > 0);
}
- free_size += vacpage->free;
+ free_space += vacpage->free;
+
/*
* Add the page to fraged_pages if it has a useful amount of free
- * space. "Useful" means enough for a minimal-sized tuple.
- * But we don't know that accurately near the start of the relation,
- * so add pages unconditionally if they have >= BLCKSZ/10 free space.
+ * space. "Useful" means enough for a minimal-sized tuple. But we
+ * don't know that accurately near the start of the relation, so
+ * add pages unconditionally if they have >= BLCKSZ/10 free space.
*/
- do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ/10);
+ do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
if (do_reap || do_frag)
{
vpage_insert(fraged_pages, vacpagecopy);
}
+ /*
+ * Include the page in empty_end_pages if it will be empty after
+ * vacuuming; this is to keep us from using it as a move
+ * destination.
+ */
if (notup)
+ {
+ empty_pages++;
empty_end_pages++;
+ }
else
empty_end_pages = 0;
if (pgchanged)
- {
WriteBuffer(buf);
- changed_pages++;
- }
else
ReleaseBuffer(buf);
}
fraged_pages->empty_end_pages = empty_end_pages;
/*
- * Clear the fraged_pages list if we found we couldn't shrink.
- * Else, remove any "empty" end-pages from the list, and compute
- * usable free space = free space in remaining pages.
+ * Clear the fraged_pages list if we found we couldn't shrink. Else,
+ * remove any "empty" end-pages from the list, and compute usable free
+ * space = free space in remaining pages.
*/
if (do_shrinking)
{
Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
fraged_pages->num_pages -= empty_end_pages;
- usable_free_size = 0;
+ usable_free_space = 0;
for (i = 0; i < fraged_pages->num_pages; i++)
- usable_free_size += fraged_pages->pagedesc[i]->free;
+ usable_free_space += fraged_pages->pagedesc[i]->free;
}
else
{
fraged_pages->num_pages = 0;
- usable_free_size = 0;
+ usable_free_space = 0;
}
- if (usable_free_size > 0 && num_vtlinks > 0)
+ /* don't bother to save vtlinks if we will not call repair_frag */
+ if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
{
qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
vac_cmp_vtlinks);
pfree(vtlinks);
}
- elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
-Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
-Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u.\n\t%s",
- nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
- new_pages, num_tuples, tups_vacuumed,
- nkeep, vacrelstats->num_vtlinks,
- nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
- free_size, usable_free_size,
- empty_end_pages, fraged_pages->num_pages,
- vac_show_rusage(&ru0));
-
+ ereport(elevel,
+ (errmsg("\"%s\": found %.0f removable, %.0f nonremovable tuples in %u pages",
+ RelationGetRelationName(onerel),
+ tups_vacuumed, num_tuples, nblocks),
+ errdetail("%.0f dead tuples cannot be removed yet.\n"
+ "Nonremovable tuples range from %lu to %lu bytes long.\n"
+ "There were %.0f unused item pointers.\n"
+ "Total free space (including removable tuples) is %.0f bytes.\n"
+ "%u pages are or will become empty, including %u at the end of the table.\n"
+ "%u pages containing %.0f free bytes are potential move destinations.\n"
+ "%s",
+ nkeep,
+ (unsigned long) min_tlen, (unsigned long) max_tlen,
+ nunused,
+ free_space,
+ empty_pages, empty_end_pages,
+ fraged_pages->num_pages, usable_free_space,
+ vac_show_rusage(&ru0))));
}
CommandId myCID;
Buffer buf,
cur_buffer;
- BlockNumber nblocks,
+ BlockNumber nblocks,
blkno;
- BlockNumber last_move_dest_block = 0,
+ BlockNumber last_move_dest_block = 0,
last_vacuum_block;
Page page,
ToPage = NULL;
* We need a ResultRelInfo and an EState so we can use the regular
* executor's index-entry-making machinery.
*/
+ estate = CreateExecutorState();
+
resultRelInfo = makeNode(ResultRelInfo);
resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
resultRelInfo->ri_RelationDesc = onerel;
- resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */
+ resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */
ExecOpenIndices(resultRelInfo);
- estate = CreateExecutorState();
estate->es_result_relations = resultRelInfo;
estate->es_num_result_relations = 1;
estate->es_result_relation_info = resultRelInfo;
/*
* Scan pages backwards from the last nonempty page, trying to move
* tuples down to lower pages. Quit when we reach a page that we have
- * moved any tuples onto, or the first page if we haven't moved anything,
- * or when we find a page we cannot completely empty (this last condition
- * is handled by "break" statements within the loop).
+ * moved any tuples onto, or the first page if we haven't moved
+ * anything, or when we find a page we cannot completely empty (this
+ * last condition is handled by "break" statements within the loop).
*
* NB: this code depends on the vacuum_pages and fraged_pages lists being
* in order by blkno.
blkno > last_move_dest_block;
blkno--)
{
+ CHECK_FOR_INTERRUPTS();
+
/*
- * Forget fraged_pages pages at or after this one; they're no longer
- * useful as move targets, since we only want to move down. Note
- * that since we stop the outer loop at last_move_dest_block, pages
- * removed here cannot have had anything moved onto them already.
+ * Forget fraged_pages pages at or after this one; they're no
+ * longer useful as move targets, since we only want to move down.
+ * Note that since we stop the outer loop at last_move_dest_block,
+ * pages removed here cannot have had anything moved onto them
+ * already.
*
- * Also note that we don't change the stored fraged_pages list,
- * only our local variable num_fraged_pages; so the forgotten pages
- * are still available to be loaded into the free space map later.
+ * Also note that we don't change the stored fraged_pages list, only
+ * our local variable num_fraged_pages; so the forgotten pages are
+ * still available to be loaded into the free space map later.
*/
while (num_fraged_pages > 0 &&
- fraged_pages->pagedesc[num_fraged_pages-1]->blkno >= blkno)
+ fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
{
- Assert(fraged_pages->pagedesc[num_fraged_pages-1]->offsets_used == 0);
+ Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
--num_fraged_pages;
}
if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin");
if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
elog(ERROR, "HEAP_MOVED_IN was not expected");
*/
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
{
+ if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+ elog(ERROR, "invalid XVAC in tuple header");
if (keep_tuples == 0)
continue;
if (chain_tuple_moved) /* some chains was moved
* If this tuple is in the chain of tuples created in updates
* by "recent" transactions then we have to move all chain of
* tuples to another places.
+ *
+ * NOTE: this test is not 100% accurate: it is possible for a
+ * tuple to be an updated one with recent xmin, and yet not
+ * have a corresponding tuple in the vtlinks list. Presumably
+ * there was once a parent tuple with xmax matching the xmin,
+ * but it's possible that that tuple has been removed --- for
+ * example, if it had xmin = xmax then
+ * HeapTupleSatisfiesVacuum would deem it removable as soon as
+ * the xmin xact completes.
+ *
+ * To be on the safe side, we abandon the repair_frag process if
+ * we cannot find the parent tuple in vtlinks. This may be
+ * overly conservative; AFAICS it would be safe to move the
+ * chain.
*/
- if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
- !TransactionIdPrecedes(tuple.t_data->t_xmin, XmaxRecent)) ||
- (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
+ if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
+ !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+ OldestXmin)) ||
+ (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_MARKED_FOR_UPDATE)) &&
!(ItemPointerEquals(&(tuple.t_self),
&(tuple.t_data->t_ctid)))))
{
Buffer Cbuf = buf;
+ bool freeCbuf = false;
+ bool chain_move_failed = false;
Page Cpage;
ItemId Citemid;
ItemPointerData Ctid;
HeapTupleData tp = tuple;
Size tlen = tuple_len;
- VTupleMove vtmove = (VTupleMove)
- palloc(100 * sizeof(VTupleMoveData));
- int num_vtmove = 0;
- int free_vtmove = 100;
+ VTupleMove vtmove;
+ int num_vtmove;
+ int free_vtmove;
VacPage to_vacpage = NULL;
int to_item = 0;
- bool freeCbuf = false;
int ti;
- if (vacrelstats->vtlinks == NULL)
- elog(ERROR, "No one parent tuple was found");
if (cur_buffer != InvalidBuffer)
{
WriteBuffer(cur_buffer);
cur_buffer = InvalidBuffer;
}
+ /* Quick exit if we have no vtlinks to search in */
+ if (vacrelstats->vtlinks == NULL)
+ {
+ elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
+ break; /* out of walk-along-page loop */
+ }
+
+ vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
+ num_vtmove = 0;
+ free_vtmove = 100;
+
/*
* If this tuple is in the begin/middle of the chain then
* we have to move to the end of chain.
*/
- while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
+ while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_MARKED_FOR_UPDATE)) &&
!(ItemPointerEquals(&(tp.t_self),
&(tp.t_data->t_ctid))))
{
ItemPointerGetOffsetNumber(&Ctid));
if (!ItemIdIsUsed(Citemid))
{
-
/*
* This means that in the middle of chain there
- * was tuple updated by older (than XmaxRecent)
+ * was tuple updated by older (than OldestXmin)
* xaction and this tuple is already deleted by
* me. Actually, upper part of chain should be
* removed and seems that this should be handled
* in scan_heap(), but it's not implemented at the
* moment and so we just stop shrinking here.
*/
- ReleaseBuffer(Cbuf);
- pfree(vtmove);
- vtmove = NULL;
- elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
- break;
+ elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
+ chain_move_failed = true;
+ break; /* out of loop to move to chain end */
}
tp.t_datamcxt = NULL;
tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
tp.t_self = Ctid;
tlen = tp.t_len = ItemIdGetLength(Citemid);
}
- if (vtmove == NULL)
- break;
- /* first, can chain be moved ? */
+ if (chain_move_failed)
+ {
+ if (freeCbuf)
+ ReleaseBuffer(Cbuf);
+ pfree(vtmove);
+ break; /* out of walk-along-page loop */
+ }
+
+ /*
+ * Check if all items in chain can be moved
+ */
for (;;)
{
+ Buffer Pbuf;
+ Page Ppage;
+ ItemId Pitemid;
+ HeapTupleData Ptp;
+ VTupleLinkData vtld,
+ *vtlp;
+
if (to_vacpage == NULL ||
!enough_space(to_vacpage, tlen))
{
if (i == num_fraged_pages)
{
/* can't move item anywhere */
- for (i = 0; i < num_vtmove; i++)
- {
- Assert(vtmove[i].vacpage->offsets_used > 0);
- (vtmove[i].vacpage->offsets_used)--;
- }
- num_vtmove = 0;
- break;
+ chain_move_failed = true;
+ break; /* out of check-all-items loop */
}
to_item = i;
to_vacpage = fraged_pages->pagedesc[to_item];
}
to_vacpage->free -= MAXALIGN(tlen);
if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
- to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
+ to_vacpage->free -= sizeof(ItemIdData);
(to_vacpage->offsets_used)++;
if (free_vtmove == 0)
{
free_vtmove = 1000;
- vtmove = (VTupleMove) repalloc(vtmove,
- (free_vtmove + num_vtmove) *
- sizeof(VTupleMoveData));
+ vtmove = (VTupleMove)
+ repalloc(vtmove,
+ (free_vtmove + num_vtmove) *
+ sizeof(VTupleMoveData));
}
vtmove[num_vtmove].tid = tp.t_self;
vtmove[num_vtmove].vacpage = to_vacpage;
free_vtmove--;
num_vtmove++;
- /* All done ? */
+ /* At beginning of chain? */
if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
- TransactionIdPrecedes(tp.t_data->t_xmin, XmaxRecent))
+ TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
+ OldestXmin))
break;
- /* Well, try to find tuple with old row version */
- for (;;)
+ /* No, move to tuple with prior row version */
+ vtld.new_tid = tp.t_self;
+ vtlp = (VTupleLink)
+ vac_bsearch((void *) &vtld,
+ (void *) (vacrelstats->vtlinks),
+ vacrelstats->num_vtlinks,
+ sizeof(VTupleLinkData),
+ vac_cmp_vtlinks);
+ if (vtlp == NULL)
{
- Buffer Pbuf;
- Page Ppage;
- ItemId Pitemid;
- HeapTupleData Ptp;
- VTupleLinkData vtld,
- *vtlp;
-
- vtld.new_tid = tp.t_self;
- vtlp = (VTupleLink)
- vac_bsearch((void *) &vtld,
- (void *) (vacrelstats->vtlinks),
- vacrelstats->num_vtlinks,
- sizeof(VTupleLinkData),
- vac_cmp_vtlinks);
- if (vtlp == NULL)
- elog(ERROR, "Parent tuple was not found");
- tp.t_self = vtlp->this_tid;
- Pbuf = ReadBuffer(onerel,
+ /* see discussion above */
+ elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
+ chain_move_failed = true;
+ break; /* out of check-all-items loop */
+ }
+ tp.t_self = vtlp->this_tid;
+ Pbuf = ReadBuffer(onerel,
ItemPointerGetBlockNumber(&(tp.t_self)));
- Ppage = BufferGetPage(Pbuf);
- Pitemid = PageGetItemId(Ppage,
+ Ppage = BufferGetPage(Pbuf);
+ Pitemid = PageGetItemId(Ppage,
ItemPointerGetOffsetNumber(&(tp.t_self)));
- if (!ItemIdIsUsed(Pitemid))
- elog(ERROR, "Parent itemid marked as unused");
- Ptp.t_datamcxt = NULL;
- Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
- Assert(ItemPointerEquals(&(vtld.new_tid),
- &(Ptp.t_data->t_ctid)));
+ /* this can't happen since we saw tuple earlier: */
+ if (!ItemIdIsUsed(Pitemid))
+ elog(ERROR, "parent itemid marked as unused");
+ Ptp.t_datamcxt = NULL;
+ Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
- /*
- * Read above about cases when
- * !ItemIdIsUsed(Citemid) (child item is
- * removed)... Due to the fact that at the moment
- * we don't remove unuseful part of update-chain,
- * it's possible to get too old parent row here.
- * Like as in the case which caused this problem,
- * we stop shrinking here. I could try to find
- * real parent row but want not to do it because
- * of real solution will be implemented anyway,
- * latter, and we are too close to 6.5 release. -
- * vadim 06/11/99
- */
- if (!(TransactionIdEquals(Ptp.t_data->t_xmax,
- tp.t_data->t_xmin)))
- {
- if (freeCbuf)
- ReleaseBuffer(Cbuf);
- freeCbuf = false;
- ReleaseBuffer(Pbuf);
- for (i = 0; i < num_vtmove; i++)
- {
- Assert(vtmove[i].vacpage->offsets_used > 0);
- (vtmove[i].vacpage->offsets_used)--;
- }
- num_vtmove = 0;
- elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
- break;
- }
-#ifdef NOT_USED /* I'm not sure that this will wotk
- * properly... */
+ /* ctid should not have changed since we saved it */
+ Assert(ItemPointerEquals(&(vtld.new_tid),
+ &(Ptp.t_data->t_ctid)));
- /*
- * If this tuple is updated version of row and it
- * was created by the same transaction then no one
- * is interested in this tuple - mark it as
- * removed.
- */
- if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
- TransactionIdEquals(Ptp.t_data->t_xmin,
- Ptp.t_data->t_xmax))
- {
- TransactionIdStore(myXID,
- (TransactionId *) &(Ptp.t_data->t_cmin));
- Ptp.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
- Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
- WriteBuffer(Pbuf);
- continue;
- }
-#endif
- tp.t_datamcxt = Ptp.t_datamcxt;
- tp.t_data = Ptp.t_data;
- tlen = tp.t_len = ItemIdGetLength(Pitemid);
- if (freeCbuf)
- ReleaseBuffer(Cbuf);
- Cbuf = Pbuf;
- freeCbuf = true;
- break;
+ /*
+ * Read above about cases when !ItemIdIsUsed(Citemid)
+ * (child item is removed)... Due to the fact that at
+ * the moment we don't remove unuseful part of
+ * update-chain, it's possible to get too old parent
+ * row here. Like as in the case which caused this
+ * problem, we stop shrinking here. I could try to
+ * find real parent row but want not to do it because
+ * of real solution will be implemented anyway, later,
+ * and we are too close to 6.5 release. - vadim
+ * 06/11/99
+ */
+ if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
+ HeapTupleHeaderGetXmin(tp.t_data))))
+ {
+ ReleaseBuffer(Pbuf);
+ elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
+ chain_move_failed = true;
+ break; /* out of check-all-items loop */
}
- if (num_vtmove == 0)
- break;
- }
+ tp.t_datamcxt = Ptp.t_datamcxt;
+ tp.t_data = Ptp.t_data;
+ tlen = tp.t_len = ItemIdGetLength(Pitemid);
+ if (freeCbuf)
+ ReleaseBuffer(Cbuf);
+ Cbuf = Pbuf;
+ freeCbuf = true;
+ } /* end of check-all-items loop */
+
if (freeCbuf)
ReleaseBuffer(Cbuf);
- if (num_vtmove == 0) /* chain can't be moved */
+ freeCbuf = false;
+
+ if (chain_move_failed)
{
+ /*
+ * Undo changes to offsets_used state. We don't
+ * bother cleaning up the amount-free state, since
+ * we're not going to do any further tuple motion.
+ */
+ for (i = 0; i < num_vtmove; i++)
+ {
+ Assert(vtmove[i].vacpage->offsets_used > 0);
+ (vtmove[i].vacpage->offsets_used)--;
+ }
pfree(vtmove);
- break;
+ break; /* out of walk-along-page loop */
}
+
+ /*
+ * Okay, move the whle tuple chain
+ */
ItemPointerSetInvalid(&Ctid);
for (ti = 0; ti < num_vtmove; ti++)
{
*/
heap_copytuple_with_tuple(&tuple, &newtup);
- RelationInvalidateHeapTuple(onerel, &tuple);
+ /*
+ * register invalidation of source tuple in catcaches.
+ */
+ CacheInvalidateHeapTuple(onerel, &tuple);
- /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
+ /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
START_CRIT_SECTION();
- TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
- tuple.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
+ tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+ HEAP_XMIN_INVALID |
+ HEAP_MOVED_IN);
tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
+ HeapTupleHeaderSetXvac(tuple.t_data, myXID);
/*
* If this page was not used before - clean it.
* Update the state of the copied tuple, and store it
* on the destination page.
*/
- TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
- newtup.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
+ newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+ HEAP_XMIN_INVALID |
+ HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
- newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
- InvalidOffsetNumber, LP_USED);
+ HeapTupleHeaderSetXvac(newtup.t_data, myXID);
+ newoff = PageAddItem(ToPage,
+ (Item) newtup.t_data,
+ tuple_len,
+ InvalidOffsetNumber,
+ LP_USED);
if (newoff == InvalidOffsetNumber)
{
- elog(STOP, "moving chain: failed to add item with len = %lu to page %u",
+ elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
(unsigned long) tuple_len, destvacpage->blkno);
}
newitemid = PageGetItemId(ToPage, newoff);
newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
+ /* XLOG stuff */
+ if (!onerel->rd_istemp)
{
XLogRecPtr recptr =
log_heap_move(onerel, Cbuf, tuple.t_self,
PageSetLSN(ToPage, recptr);
PageSetSUI(ToPage, ThisStartUpID);
}
+ else
+ {
+ /*
+ * No XLOG record, but still need to flag that XID
+ * exists on disk
+ */
+ MyXactMadeTempRelUpdate = true;
+ }
+
END_CRIT_SECTION();
if (destvacpage->blkno > last_move_dest_block)
WriteBuffer(cur_buffer);
WriteBuffer(Cbuf);
- }
+ } /* end of move-the-tuple-chain loop */
+
cur_buffer = InvalidBuffer;
pfree(vtmove);
chain_tuple_moved = true;
+
+ /* advance to next tuple in walk-along-page loop */
continue;
- }
+ } /* end of is-tuple-in-chain test */
/* try to find new page for this tuple */
if (cur_buffer == InvalidBuffer ||
/* copy tuple */
heap_copytuple_with_tuple(&tuple, &newtup);
- RelationInvalidateHeapTuple(onerel, &tuple);
+ /*
+ * register invalidation of source tuple in catcaches.
+ *
+ * (Note: we do not need to register the copied tuple, because we
+ * are not changing the tuple contents and so there cannot be
+ * any need to flush negative catcache entries.)
+ */
+ CacheInvalidateHeapTuple(onerel, &tuple);
- /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
+ /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
START_CRIT_SECTION();
/*
- * Mark new tuple as moved_in by vacuum and store vacuum XID
- * in t_cmin !!!
+ * Mark new tuple as MOVED_IN by me.
*/
- TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
- newtup.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
+ newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+ HEAP_XMIN_INVALID |
+ HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+ HeapTupleHeaderSetXvac(newtup.t_data, myXID);
/* add tuple to the page */
newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
InvalidOffsetNumber, LP_USED);
if (newoff == InvalidOffsetNumber)
{
- elog(STOP, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
+ elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
(unsigned long) tuple_len,
cur_page->blkno, (unsigned long) cur_page->free,
cur_page->offsets_used, cur_page->offsets_free);
newtup.t_self = newtup.t_data->t_ctid;
/*
- * Mark old tuple as moved_off by vacuum and store vacuum XID
- * in t_cmin !!!
+ * Mark old tuple as MOVED_OFF by me.
*/
- TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
- tuple.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
+ tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+ HEAP_XMIN_INVALID |
+ HEAP_MOVED_IN);
tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
+ HeapTupleHeaderSetXvac(tuple.t_data, myXID);
+ /* XLOG stuff */
+ if (!onerel->rd_istemp)
{
XLogRecPtr recptr =
log_heap_move(onerel, buf, tuple.t_self,
PageSetLSN(ToPage, recptr);
PageSetSUI(ToPage, ThisStartUpID);
}
+ else
+ {
+ /*
+ * No XLOG record, but still need to flag that XID exists
+ * on disk
+ */
+ MyXactMadeTempRelUpdate = true;
+ }
+
END_CRIT_SECTION();
cur_page->offsets_used++;
}
} /* walk along page */
+ /*
+ * If we broke out of the walk-along-page loop early (ie, still
+ * have offnum <= maxoff), then we failed to move some tuple off
+ * this page. No point in shrinking any more, so clean up and
+ * exit the per-page loop.
+ */
if (offnum < maxoff && keep_tuples > 0)
{
OffsetNumber off;
+ /*
+ * Fix vacpage state for any unvisited tuples remaining on
+ * page
+ */
for (off = OffsetNumberNext(offnum);
off <= maxoff;
off = OffsetNumberNext(off))
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
continue;
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin (4)");
if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
- elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
+ elog(ERROR, "HEAP_MOVED_IN was not expected");
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
{
+ if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+ elog(ERROR, "invalid XVAC in tuple header");
/* some chains was moved while */
if (chain_tuple_moved)
{ /* cleaning this page */
keep_tuples--;
}
}
+ else
+ elog(ERROR, "HEAP_MOVED_OFF was expected");
}
}
ReleaseBuffer(buf);
if (offnum <= maxoff)
- break; /* some item(s) left */
+ break; /* had to quit early, see above note */
} /* walk along relation */
/*
* We are not going to move any more tuples across pages, but we still
* need to apply vacuum_page to compact free space in the remaining
- * pages in vacuum_pages list. Note that some of these pages may also
- * be in the fraged_pages list, and may have had tuples moved onto them;
- * if so, we already did vacuum_page and needn't do it again.
+ * pages in vacuum_pages list. Note that some of these pages may also
+ * be in the fraged_pages list, and may have had tuples moved onto
+ * them; if so, we already did vacuum_page and needn't do it again.
*/
for (i = 0, curpage = vacuum_pages->pagedesc;
i < vacuumed_pages;
i++, curpage++)
{
+ CHECK_FOR_INTERRUPTS();
Assert((*curpage)->blkno < blkno);
if ((*curpage)->offsets_used == 0)
{
}
/*
- * Now scan all the pages that we moved tuples onto and update
- * tuple status bits. This is not really necessary, but will save time
- * for future transactions examining these tuples.
+ * Now scan all the pages that we moved tuples onto and update tuple
+ * status bits. This is not really necessary, but will save time for
+ * future transactions examining these tuples.
*
- * XXX Notice that this code fails to clear HEAP_MOVED_OFF tuples from
- * pages that were move source pages but not move dest pages. One also
- * wonders whether it wouldn't be better to skip this step and let the
- * tuple status updates happen someplace that's not holding an exclusive
- * lock on the relation.
+ * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
+ * pages that were move source pages but not move dest pages. One
+ * also wonders whether it wouldn't be better to skip this step and
+ * let the tuple status updates happen someplace that's not holding an
+ * exclusive lock on the relation.
*/
checked_moved = 0;
for (i = 0, curpage = fraged_pages->pagedesc;
i < num_fraged_pages;
i++, curpage++)
{
+ CHECK_FOR_INTERRUPTS();
Assert((*curpage)->blkno < blkno);
if ((*curpage)->blkno > last_move_dest_block)
break; /* no need to scan any further */
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin (2)");
+ if (!(tuple.t_data->t_infomask & HEAP_MOVED))
+ elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+ if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+ elog(ERROR, "invalid XVAC in tuple header");
if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
{
tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
+ tuple.t_data->t_infomask &= ~HEAP_MOVED;
num_tuples++;
}
- else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
- tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
else
- elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+ tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
}
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
}
Assert(num_moved == checked_moved);
- elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u.\n\t%s",
- RelationGetRelationName(onerel),
- nblocks, blkno, num_moved,
- vac_show_rusage(&ru0));
+ /*
+ * It'd be cleaner to make this report at the bottom of this routine,
+ * but then the rusage would double-count the second pass of index
+ * vacuuming. So do it here and ignore the relatively small amount of
+ * processing that occurs below.
+ */
+ ereport(elevel,
+ (errmsg("\"%s\": moved %u tuples, truncated %u to %u pages",
+ RelationGetRelationName(onerel),
+ num_moved, nblocks, blkno),
+ errdetail("%s",
+ vac_show_rusage(&ru0))));
/*
* Reflect the motion of system tuples to catalog cache here.
if (vacpage->blkno == (blkno - 1) &&
vacpage->offsets_free > 0)
{
- OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
- OffsetNumber *unused = unbuf;
+ OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
int uncnt;
buf = ReadBuffer(onerel, vacpage->blkno);
if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin (3)");
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
{
+ if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+ elog(ERROR, "invalid XVAC in tuple header");
itemid->lp_flags &= ~LP_USED;
num_tuples++;
}
else
- elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
+ elog(ERROR, "HEAP_MOVED_OFF was expected");
}
}
Assert(vacpage->offsets_free == num_tuples);
+
START_CRIT_SECTION();
+
uncnt = PageRepairFragmentation(page, unused);
+
+ /* XLOG stuff */
+ if (!onerel->rd_istemp)
{
XLogRecPtr recptr;
- recptr = log_heap_clean(onerel, buf, (char *) unused,
- (char *) (&(unused[uncnt])) - (char *) unused);
+ recptr = log_heap_clean(onerel, buf, unused, uncnt);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
+ else
+ {
+ /*
+ * No XLOG record, but still need to flag that XID exists
+ * on disk
+ */
+ MyXactMadeTempRelUpdate = true;
+ }
+
END_CRIT_SECTION();
+
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
}
*/
i = FlushRelationBuffers(onerel, blkno);
if (i < 0)
- elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
- i);
+ elog(ERROR, "FlushRelationBuffers returned %d", i);
/* truncate relation, if needed */
if (blkno < nblocks)
{
blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
- onerel->rd_nblocks = blkno; /* update relcache immediately */
+ onerel->rd_nblocks = blkno; /* update relcache immediately */
onerel->rd_targblock = InvalidBlockNumber;
vacrelstats->rel_pages = blkno; /* set new number of blocks */
}
ExecDropTupleTable(tupleTable, true);
ExecCloseIndices(resultRelInfo);
+
+ FreeExecutorState(estate);
}
/*
{
Buffer buf;
VacPage *vacpage;
- BlockNumber relblocks;
+ BlockNumber relblocks;
int nblocks;
int i;
for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
{
+ CHECK_FOR_INTERRUPTS();
if ((*vacpage)->offsets_free > 0)
{
buf = ReadBuffer(onerel, (*vacpage)->blkno);
i = FlushRelationBuffers(onerel, relblocks);
if (i < 0)
- elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
- i);
+ elog(ERROR, "FlushRelationBuffers returned %d", i);
/* truncate relation if there are some empty end-pages */
if (vacuum_pages->empty_end_pages > 0)
{
- elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
- RelationGetRelationName(onerel),
- vacrelstats->rel_pages, relblocks);
+ ereport(elevel,
+ (errmsg("\"%s\": truncated %u to %u pages",
+ RelationGetRelationName(onerel),
+ vacrelstats->rel_pages, relblocks)));
relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
- onerel->rd_nblocks = relblocks; /* update relcache immediately */
+ onerel->rd_nblocks = relblocks; /* update relcache immediately */
onerel->rd_targblock = InvalidBlockNumber;
vacrelstats->rel_pages = relblocks; /* set new number of
* blocks */
static void
vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
{
- OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
- OffsetNumber *unused = unbuf;
+ OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
int uncnt;
Page page = BufferGetPage(buffer);
ItemId itemid;
Assert(vacpage->offsets_used == 0);
START_CRIT_SECTION();
+
for (i = 0; i < vacpage->offsets_free; i++)
{
itemid = PageGetItemId(page, vacpage->offsets[i]);
itemid->lp_flags &= ~LP_USED;
}
+
uncnt = PageRepairFragmentation(page, unused);
+
+ /* XLOG stuff */
+ if (!onerel->rd_istemp)
{
XLogRecPtr recptr;
- recptr = log_heap_clean(onerel, buffer, (char *) unused,
- (char *) (&(unused[uncnt])) - (char *) unused);
+ recptr = log_heap_clean(onerel, buffer, unused, uncnt);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
+ else
+ {
+ /* No XLOG record, but still need to flag that XID exists on disk */
+ MyXactMadeTempRelUpdate = true;
+ }
+
END_CRIT_SECTION();
}
/*
* scan_index() -- scan one index relation to update statistic.
+ *
+ * We use this when we have no deletions to do.
*/
static void
scan_index(Relation indrel, double num_tuples)
{
- RetrieveIndexResult res;
- IndexScanDesc iscan;
- BlockNumber nipages;
- double nitups;
+ IndexBulkDeleteResult *stats;
+ IndexVacuumCleanupInfo vcinfo;
VacRUsage ru0;
vac_init_rusage(&ru0);
- /* walk through the entire index */
- iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
- nitups = 0;
+ /*
+ * Even though we're not planning to delete anything, we use the
+ * ambulkdelete call, because (a) the scan happens within the index AM
+ * for more speed, and (b) it may want to pass private statistics to
+ * the amvacuumcleanup call.
+ */
+ stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
+
+ /* Do post-VACUUM cleanup, even though we deleted nothing */
+ vcinfo.vacuum_full = true;
+ vcinfo.message_level = elevel;
- while ((res = index_getnext(iscan, ForwardScanDirection))
- != (RetrieveIndexResult) NULL)
- {
- nitups += 1;
- pfree(res);
- }
+ stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
- index_endscan(iscan);
+ if (!stats)
+ return;
/* now update statistics in pg_class */
- nipages = RelationGetNumberOfBlocks(indrel);
- vac_update_relstats(RelationGetRelid(indrel), nipages, nitups, false);
+ vac_update_relstats(RelationGetRelid(indrel),
+ stats->num_pages, stats->num_index_tuples,
+ false);
- elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f.\n\t%s",
- RelationGetRelationName(indrel), nipages, nitups,
- vac_show_rusage(&ru0));
+ ereport(elevel,
+ (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples,
+ stats->num_pages),
+ errdetail("%u index pages have been deleted, %u are currently reusable.\n"
+ "%s",
+ stats->pages_deleted, stats->pages_free,
+ vac_show_rusage(&ru0))));
/*
- * Check for tuple count mismatch. If the index is partial, then
- * it's OK for it to have fewer tuples than the heap; else we got trouble.
+ * Check for tuple count mismatch. If the index is partial, then it's
+ * OK for it to have fewer tuples than the heap; else we got trouble.
*/
- if (nitups != num_tuples)
+ if (stats->num_index_tuples != num_tuples)
{
- if (nitups > num_tuples ||
- ! is_partial_index(indrel))
- elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
-\n\tRecreate the index.",
- RelationGetRelationName(indrel), nitups, num_tuples);
+ if (stats->num_index_tuples > num_tuples ||
+ !vac_is_partial_index(indrel))
+ ereport(WARNING,
+ (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples, num_tuples),
+ errhint("Rebuild the index with REINDEX.")));
}
+
+ pfree(stats);
}
/*
double num_tuples, int keep_tuples)
{
IndexBulkDeleteResult *stats;
+ IndexVacuumCleanupInfo vcinfo;
VacRUsage ru0;
vac_init_rusage(&ru0);
/* Do bulk deletion */
stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
+ /* Do post-VACUUM cleanup */
+ vcinfo.vacuum_full = true;
+ vcinfo.message_level = elevel;
+
+ stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
+
if (!stats)
return;
stats->num_pages, stats->num_index_tuples,
false);
- elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f: Deleted %.0f.\n\t%s",
- RelationGetRelationName(indrel), stats->num_pages,
- stats->num_index_tuples - keep_tuples, stats->tuples_removed,
- vac_show_rusage(&ru0));
+ ereport(elevel,
+ (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples,
+ stats->num_pages),
+ errdetail("%.0f index tuples were removed.\n"
+ "%u index pages have been deleted, %u are currently reusable.\n"
+ "%s",
+ stats->tuples_removed,
+ stats->pages_deleted, stats->pages_free,
+ vac_show_rusage(&ru0))));
/*
- * Check for tuple count mismatch. If the index is partial, then
- * it's OK for it to have fewer tuples than the heap; else we got trouble.
+ * Check for tuple count mismatch. If the index is partial, then it's
+ * OK for it to have fewer tuples than the heap; else we got trouble.
*/
if (stats->num_index_tuples != num_tuples + keep_tuples)
{
if (stats->num_index_tuples > num_tuples + keep_tuples ||
- ! is_partial_index(indrel))
- elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%.0f) IS NOT THE SAME AS HEAP' (%.0f).\
-\n\tRecreate the index.",
- RelationGetRelationName(indrel),
- stats->num_index_tuples, num_tuples);
+ !vac_is_partial_index(indrel))
+ ereport(WARNING,
+ (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples, num_tuples + keep_tuples),
+ errhint("Rebuild the index with REINDEX.")));
}
pfree(stats);
static bool
tid_reaped(ItemPointer itemptr, void *state)
{
- VacPageList vacpagelist = (VacPageList) state;
+ VacPageList vacpagelist = (VacPageList) state;
OffsetNumber ioffno;
OffsetNumber *voff;
VacPage vp,
return true;
}
+/*
+ * Dummy version for scan_index.
+ */
+static bool
+dummy_tid_reaped(ItemPointer itemptr, void *state)
+{
+ return false;
+}
+
/*
* Update the shared Free Space Map with the info we now have about
* free space in the relation, discarding any old info the map may have.
BlockNumber rel_pages)
{
int nPages = fraged_pages->num_pages;
+ VacPage *pagedesc = fraged_pages->pagedesc;
+ Size threshold;
+ PageFreeSpaceInfo *pageSpaces;
+ int outPages;
int i;
- BlockNumber *pages;
- Size *spaceAvail;
+
+ /*
+ * We only report pages with free space at least equal to the average
+ * request size --- this avoids cluttering FSM with uselessly-small
+ * bits of space. Although FSM would discard pages with little free
+ * space anyway, it's important to do this prefiltering because (a) it
+ * reduces the time spent holding the FSM lock in
+ * RecordRelationFreeSpace, and (b) FSM uses the number of pages
+ * reported as a statistic for guiding space management. If we didn't
+ * threshold our reports the same way vacuumlazy.c does, we'd be
+ * skewing that statistic.
+ */
+ threshold = GetAvgFSMRequestSize(&onerel->rd_node);
/* +1 to avoid palloc(0) */
- pages = (BlockNumber *) palloc((nPages + 1) * sizeof(BlockNumber));
- spaceAvail = (Size *) palloc((nPages + 1) * sizeof(Size));
+ pageSpaces = (PageFreeSpaceInfo *)
+ palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
+ outPages = 0;
for (i = 0; i < nPages; i++)
{
- pages[i] = fraged_pages->pagedesc[i]->blkno;
- spaceAvail[i] = fraged_pages->pagedesc[i]->free;
/*
- * fraged_pages may contain entries for pages that we later decided
- * to truncate from the relation; don't enter them into the map!
+ * fraged_pages may contain entries for pages that we later
+ * decided to truncate from the relation; don't enter them into
+ * the free space map!
*/
- if (pages[i] >= rel_pages)
- {
- nPages = i;
+ if (pagedesc[i]->blkno >= rel_pages)
break;
+
+ if (pagedesc[i]->free >= threshold)
+ {
+ pageSpaces[outPages].blkno = pagedesc[i]->blkno;
+ pageSpaces[outPages].avail = pagedesc[i]->free;
+ outPages++;
}
}
- MultiRecordFreeSpace(&onerel->rd_node,
- 0, MaxBlockNumber,
- nPages, pages, spaceAvail);
- pfree(pages);
- pfree(spaceAvail);
+ RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
+
+ pfree(pageSpaces);
}
/* Copy a VacPage structure */
/* allocate a VacPageData entry */
newvacpage = (VacPage) palloc(sizeof(VacPageData) +
- vacpage->offsets_free * sizeof(OffsetNumber));
+ vacpage->offsets_free * sizeof(OffsetNumber));
/* fill it in */
if (vacpage->offsets_free > 0)
/*
* vac_bsearch: just like standard C library routine bsearch(),
* except that we first test to see whether the target key is outside
- * the range of the table entries. This case is handled relatively slowly
+ * the range of the table entries. This case is handled relatively slowly
* by the normal binary search algorithm (ie, no faster than any other key)
* but it occurs often enough in VACUUM to be worth optimizing.
*/
i = 0;
foreach(indexoidscan, indexoidlist)
{
- Oid indexoid = lfirsti(indexoidscan);
+ Oid indexoid = lfirsto(indexoidscan);
(*Irel)[i] = index_open(indexoid);
i++;
}
-static bool
-is_partial_index(Relation indrel)
+/*
+ * Is an index partial (ie, could it contain fewer tuples than the heap?)
+ */
+bool
+vac_is_partial_index(Relation indrel)
{
- bool result;
- HeapTuple cachetuple;
- Form_pg_index indexStruct;
-
/*
- * If the index's AM doesn't support nulls, it's partial for our purposes
+ * If the index's AM doesn't support nulls, it's partial for our
+ * purposes
*/
- if (! indrel->rd_am->amindexnulls)
+ if (!indrel->rd_am->amindexnulls)
return true;
/* Otherwise, look to see if there's a partial-index predicate */
- cachetuple = SearchSysCache(INDEXRELID,
- ObjectIdGetDatum(RelationGetRelid(indrel)),
- 0, 0, 0);
- if (!HeapTupleIsValid(cachetuple))
- elog(ERROR, "is_partial_index: index %u not found",
- RelationGetRelid(indrel));
- indexStruct = (Form_pg_index) GETSTRUCT(cachetuple);
-
- result = (VARSIZE(&indexStruct->indpred) > VARHDRSZ);
+ if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
+ return true;
- ReleaseSysCache(cachetuple);
- return result;
+ return false;
}
snprintf(result, sizeof(result),
"CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
(int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
- (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
+ (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
(int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
- (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
+ (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
(int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
(int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);