/*-------------------------------------------------------------------------
*
* vacuum.c
- * the postgres vacuum cleaner
+ * The postgres vacuum cleaner.
*
- * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * This file includes the "full" version of VACUUM, as well as control code
+ * used by all three of full VACUUM, lazy VACUUM, and ANALYZE. See
+ * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
+ *
+ *
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.184 2001/01/19 22:08:46 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.260 2003/09/24 18:54:01 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
-#include <sys/types.h>
-#include <sys/file.h>
-#include <sys/stat.h>
-#include <fcntl.h>
#include <unistd.h>
-#ifndef HAVE_GETRUSAGE
-#include "rusagestub.h"
-#else
-#include <sys/time.h>
-#include <sys/resource.h>
-#endif
-
+#include "access/clog.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
#include "catalog/catname.h"
-#include "catalog/index.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_index.h"
#include "commands/vacuum.h"
+#include "executor/executor.h"
#include "miscadmin.h"
-#include "nodes/execnodes.h"
+#include "storage/freespace.h"
#include "storage/sinval.h"
#include "storage/smgr.h"
-#include "tcop/tcopprot.h"
+#include "tcop/pquery.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
+#include "utils/lsyscache.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
-#include "utils/temprel.h"
+#include "pgstat.h"
+
+
+typedef struct VacPageData
+{
+ BlockNumber blkno; /* BlockNumber of this Page */
+ Size free; /* FreeSpace on this Page */
+ uint16 offsets_used; /* Number of OffNums used by vacuum */
+ uint16 offsets_free; /* Number of OffNums free or to be free */
+ OffsetNumber offsets[1]; /* Array of free OffNums */
+} VacPageData;
+
+typedef VacPageData *VacPage;
+
+typedef struct VacPageListData
+{
+ BlockNumber empty_end_pages; /* Number of "empty" end-pages */
+ int num_pages; /* Number of pages in pagedesc */
+ int num_allocated_pages; /* Number of allocated pages in
+ * pagedesc */
+ VacPage *pagedesc; /* Descriptions of pages */
+} VacPageListData;
+
+typedef VacPageListData *VacPageList;
+
+typedef struct VTupleLinkData
+{
+ ItemPointerData new_tid;
+ ItemPointerData this_tid;
+} VTupleLinkData;
+
+typedef VTupleLinkData *VTupleLink;
+
+typedef struct VTupleMoveData
+{
+ ItemPointerData tid; /* tuple ID */
+ VacPage vacpage; /* where to move */
+ bool cleanVpd; /* clean vacpage before using */
+} VTupleMoveData;
+
+typedef VTupleMoveData *VTupleMove;
+
+typedef struct VRelStats
+{
+ BlockNumber rel_pages;
+ double rel_tuples;
+ Size min_tlen;
+ Size max_tlen;
+ bool hasindex;
+ int num_vtlinks;
+ VTupleLink vtlinks;
+} VRelStats;
-extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
- char *unused, int unlen);
-extern XLogRecPtr log_heap_move(Relation reln,
- Buffer oldbuf, ItemPointerData from,
- Buffer newbuf, HeapTuple newtup);
static MemoryContext vac_context = NULL;
-static int MESSAGE_LEVEL; /* message level */
+static int elevel = -1;
+
+static TransactionId OldestXmin;
+static TransactionId FreezeLimit;
-static TransactionId XmaxRecent;
/* non-export function prototypes */
-static void vacuum_init(void);
-static void vacuum_shutdown(void);
-static void vac_vacuum(NameData *VacRelP, bool analyze, List *anal_cols2);
-static VRelList getrels(NameData *VacRelP);
-static void vacuum_rel(Oid relid);
-static void scan_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages);
-static void repair_frag(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages, int nindices, Relation *Irel);
-static void vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacpagelist);
+static List *getrels(const RangeVar *vacrel, const char *stmttype);
+static void vac_update_dbstats(Oid dbid,
+ TransactionId vacuumXID,
+ TransactionId frozenXID);
+static void vac_truncate_clog(TransactionId vacuumXID,
+ TransactionId frozenXID);
+static bool vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
+static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
+static void scan_heap(VRelStats *vacrelstats, Relation onerel,
+ VacPageList vacuum_pages, VacPageList fraged_pages);
+static void repair_frag(VRelStats *vacrelstats, Relation onerel,
+ VacPageList vacuum_pages, VacPageList fraged_pages,
+ int nindexes, Relation *Irel);
+static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
+ VacPageList vacpagelist);
static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
-static void vacuum_index(VacPageList vacpagelist, Relation indrel, int num_tuples, int keep_tuples);
-static void scan_index(Relation indrel, int num_tuples);
-static void update_relstats(Oid relid, int num_pages, int num_tuples, bool hasindex, VRelStats *vacrelstats);
-static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist);
-static void reap_page(VacPageList vacpagelist, VacPage vacpage);
+static void vacuum_index(VacPageList vacpagelist, Relation indrel,
+ double num_tuples, int keep_tuples);
+static void scan_index(Relation indrel, double num_tuples);
+static bool tid_reaped(ItemPointer itemptr, void *state);
+static bool dummy_tid_reaped(ItemPointer itemptr, void *state);
+static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
+ BlockNumber rel_pages);
+static VacPage copy_vac_page(VacPage vacpage);
static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
-static void get_indices(Relation relation, int *nindices, Relation **Irel);
-static void close_indices(int nindices, Relation *Irel);
-static IndexInfo **get_index_desc(Relation onerel, int nindices,
- Relation *Irel);
-static void *vac_find_eq(void *bot, int nelem, int size, void *elm,
- int (*compar) (const void *, const void *));
+static void *vac_bsearch(const void *key, const void *base,
+ size_t nelem, size_t size,
+ int (*compar) (const void *, const void *));
static int vac_cmp_blk(const void *left, const void *right);
static int vac_cmp_offno(const void *left, const void *right);
static int vac_cmp_vtlinks(const void *left, const void *right);
static bool enough_space(VacPage vacpage, Size len);
-static char *show_rusage(struct rusage * ru0);
+/****************************************************************************
+ * *
+ * Code common to all flavors of VACUUM and ANALYZE *
+ * *
+ ****************************************************************************
+ */
+
+
+/*
+ * Primary entry point for VACUUM and ANALYZE commands.
+ */
void
-vacuum(char *vacrel, bool verbose, bool analyze, List *anal_cols)
+vacuum(VacuumStmt *vacstmt)
{
- NameData VacRel;
- Name VacRelName;
- MemoryContext old;
- List *le;
- List *anal_cols2 = NIL;
-
- if (anal_cols != NIL && !analyze)
- elog(ERROR, "Can't vacuum columns, only tables. You can 'vacuum analyze' columns.");
+ const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
+ MemoryContext anl_context = NULL;
+ TransactionId initialOldestXmin = InvalidTransactionId;
+ TransactionId initialFreezeLimit = InvalidTransactionId;
+ bool all_rels;
+ List *vrl,
+ *cur;
+
+ if (vacstmt->verbose)
+ elevel = INFO;
+ else
+ elevel = DEBUG2;
/*
* We cannot run VACUUM inside a user transaction block; if we were
* user's transaction too, which would certainly not be the desired
* behavior.
*/
- if (IsTransactionBlock())
- elog(ERROR, "VACUUM cannot run inside a BEGIN/END block");
+ if (vacstmt->vacuum)
+ PreventTransactionChain((void *) vacstmt, stmttype);
- if (verbose)
- MESSAGE_LEVEL = NOTICE;
- else
- MESSAGE_LEVEL = DEBUG;
+ /*
+ * Send info about dead objects to the statistics collector
+ */
+ if (vacstmt->vacuum)
+ pgstat_vacuum_tabstat();
/*
* Create special memory context for cross-transaction storage.
*
- * Since it is a child of QueryContext, it will go away eventually
- * even if we suffer an error; there's no need for special abort
- * cleanup logic.
+ * Since it is a child of PortalContext, it will go away eventually even
+ * if we suffer an error; there's no need for special abort cleanup
+ * logic.
*/
- vac_context = AllocSetContextCreate(QueryContext,
+ vac_context = AllocSetContextCreate(PortalContext,
"Vacuum",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
- /* vacrel gets de-allocated on xact commit, so copy it to safe storage */
- if (vacrel)
+ /*
+ * If we are running only ANALYZE, we don't need per-table
+ * transactions, but we still need a memory context with table
+ * lifetime.
+ */
+ if (vacstmt->analyze && !vacstmt->vacuum)
+ anl_context = AllocSetContextCreate(PortalContext,
+ "Analyze",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* Assume we are processing everything unless one table is mentioned */
+ all_rels = (vacstmt->relation == NULL);
+
+ /* Build list of relations to process (note this lives in vac_context) */
+ vrl = getrels(vacstmt->relation, stmttype);
+
+ /*
+ * Formerly, there was code here to prevent more than one VACUUM from
+ * executing concurrently in the same database. However, there's no
+ * good reason to prevent that, and manually removing lockfiles after
+ * a vacuum crash was a pain for dbadmins. So, forget about
+ * lockfiles, and just rely on the locks we grab on each target table
+ * to ensure that there aren't two VACUUMs running on the same table
+ * at the same time.
+ */
+
+ /*
+ * The strangeness with committing and starting transactions here is
+ * due to wanting to run each table's VACUUM as a separate
+ * transaction, so that we don't hold locks unnecessarily long. Also,
+ * if we are doing VACUUM ANALYZE, the ANALYZE part runs as a separate
+ * transaction from the VACUUM to further reduce locking.
+ *
+ * vacuum_rel expects to be entered with no transaction active; it will
+ * start and commit its own transaction. But we are called by an SQL
+ * command, and so we are executing inside a transaction already. We
+ * commit the transaction started in PostgresMain() here, and start
+ * another one before exiting to match the commit waiting for us back
+ * in PostgresMain().
+ *
+ * In the case of an ANALYZE statement (no vacuum, just analyze) it's
+ * okay to run the whole thing in the outer transaction, and so we
+ * skip transaction start/stop operations.
+ */
+ if (vacstmt->vacuum)
{
- namestrcpy(&VacRel, vacrel);
- VacRelName = &VacRel;
+ if (all_rels)
+ {
+ /*
+ * It's a database-wide VACUUM.
+ *
+ * Compute the initially applicable OldestXmin and FreezeLimit
+ * XIDs, so that we can record these values at the end of the
+ * VACUUM. Note that individual tables may well be processed
+ * with newer values, but we can guarantee that no
+ * (non-shared) relations are processed with older ones.
+ *
+ * It is okay to record non-shared values in pg_database, even
+ * though we may vacuum shared relations with older cutoffs,
+ * because only the minimum of the values present in
+ * pg_database matters. We can be sure that shared relations
+ * have at some time been vacuumed with cutoffs no worse than
+ * the global minimum; for, if there is a backend in some
+ * other DB with xmin = OLDXMIN that's determining the cutoff
+ * with which we vacuum shared relations, it is not possible
+ * for that database to have a cutoff newer than OLDXMIN
+ * recorded in pg_database.
+ */
+ vacuum_set_xid_limits(vacstmt, false,
+ &initialOldestXmin,
+ &initialFreezeLimit);
+ }
+
+ /* matches the StartTransaction in PostgresMain() */
+ CommitTransactionCommand();
}
- else
- VacRelName = NULL;
- /* must also copy the column list, if any, to safe storage */
- old = MemoryContextSwitchTo(vac_context);
- foreach(le, anal_cols)
+ /*
+ * Loop to process each selected relation.
+ */
+ foreach(cur, vrl)
{
- char *col = (char *) lfirst(le);
+ Oid relid = lfirsto(cur);
+
+ if (vacstmt->vacuum)
+ {
+ if (!vacuum_rel(relid, vacstmt, RELKIND_RELATION))
+ all_rels = false; /* forget about updating dbstats */
+ }
+ if (vacstmt->analyze)
+ {
+ MemoryContext old_context = NULL;
- anal_cols2 = lappend(anal_cols2, pstrdup(col));
+ /*
+ * If we vacuumed, use new transaction for analyze. Otherwise,
+ * we can use the outer transaction, but we still need to call
+ * analyze_rel in a memory context that will be cleaned up on
+ * return (else we leak memory while processing multiple
+ * tables).
+ */
+ if (vacstmt->vacuum)
+ {
+ StartTransactionCommand();
+ SetQuerySnapshot(); /* might be needed for functions
+ * in indexes */
+ }
+ else
+ old_context = MemoryContextSwitchTo(anl_context);
+
+ analyze_rel(relid, vacstmt);
+
+ if (vacstmt->vacuum)
+ CommitTransactionCommand();
+ else
+ {
+ MemoryContextSwitchTo(old_context);
+ MemoryContextResetAndDeleteChildren(anl_context);
+ }
+ }
}
- MemoryContextSwitchTo(old);
/*
- * Start up the vacuum cleaner.
- *
- * NOTE: since this commits the current transaction, the memory holding
- * any passed-in parameters gets freed here. We must have already
- * copied pass-by-reference parameters to safe storage. Don't make me
- * fix this again!
+ * Finish up processing.
*/
- vacuum_init();
+ if (vacstmt->vacuum)
+ {
+ /* here, we are not in a transaction */
- /* vacuum the database */
- vac_vacuum(VacRelName, analyze, anal_cols2);
+ /*
+ * This matches the CommitTransaction waiting for us in
+ * PostgresMain().
+ */
+ StartTransactionCommand();
- /* clean up */
- vacuum_shutdown();
+ /*
+ * If it was a database-wide VACUUM, print FSM usage statistics
+ * (we don't make you be superuser to see these).
+ */
+ if (vacstmt->relation == NULL)
+ PrintFreeSpaceMapStatistics(elevel);
+
+ /*
+ * If we completed a database-wide VACUUM without skipping any
+ * relations, update the database's pg_database row with info
+ * about the transaction IDs used, and try to truncate pg_clog.
+ */
+ if (all_rels)
+ {
+ vac_update_dbstats(MyDatabaseId,
+ initialOldestXmin, initialFreezeLimit);
+ vac_truncate_clog(initialOldestXmin, initialFreezeLimit);
+ }
+ }
+
+ /*
+ * Clean up working storage --- note we must do this after
+ * StartTransactionCommand, else we might be trying to delete the
+ * active context!
+ */
+ MemoryContextDelete(vac_context);
+ vac_context = NULL;
+
+ if (anl_context)
+ MemoryContextDelete(anl_context);
}
/*
- * vacuum_init(), vacuum_shutdown() -- start up and shut down the vacuum cleaner.
+ * Build a list of Oids for each relation to be processed
*
- * Formerly, there was code here to prevent more than one VACUUM from
- * executing concurrently in the same database. However, there's no
- * good reason to prevent that, and manually removing lockfiles after
- * a vacuum crash was a pain for dbadmins. So, forget about lockfiles,
- * and just rely on the exclusive lock we grab on each target table
- * to ensure that there aren't two VACUUMs running on the same table
- * at the same time.
- *
- * The strangeness with committing and starting transactions in the
- * init and shutdown routines is due to the fact that the vacuum cleaner
- * is invoked via an SQL command, and so is already executing inside
- * a transaction. We need to leave ourselves in a predictable state
- * on entry and exit to the vacuum cleaner. We commit the transaction
- * started in PostgresMain() inside vacuum_init(), and start one in
- * vacuum_shutdown() to match the commit waiting for us back in
- * PostgresMain().
+ * The list is built in vac_context so that it will survive across our
+ * per-relation transactions.
*/
-static void
-vacuum_init()
+static List *
+getrels(const RangeVar *vacrel, const char *stmttype)
{
- /* matches the StartTransaction in PostgresMain() */
- CommitTransactionCommand();
+ List *vrl = NIL;
+ MemoryContext oldcontext;
+
+ if (vacrel)
+ {
+ /* Process specific relation */
+ Oid relid;
+
+ relid = RangeVarGetRelid(vacrel, false);
+
+ /* Make a relation list entry for this guy */
+ oldcontext = MemoryContextSwitchTo(vac_context);
+ vrl = lappendo(vrl, relid);
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ /* Process all plain relations listed in pg_class */
+ Relation pgclass;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ ScanKeyData key;
+
+ ScanKeyEntryInitialize(&key, 0x0,
+ Anum_pg_class_relkind,
+ F_CHAREQ,
+ CharGetDatum(RELKIND_RELATION));
+
+ pgclass = heap_openr(RelationRelationName, AccessShareLock);
+
+ scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ /* Make a relation list entry for this guy */
+ oldcontext = MemoryContextSwitchTo(vac_context);
+ vrl = lappendo(vrl, HeapTupleGetOid(tuple));
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ heap_endscan(scan);
+ heap_close(pgclass, AccessShareLock);
+ }
+
+ return vrl;
}
-static void
-vacuum_shutdown()
+/*
+ * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
+ */
+void
+vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
+ TransactionId *oldestXmin,
+ TransactionId *freezeLimit)
{
- /* on entry, we are not in a transaction */
+ TransactionId limit;
+
+ *oldestXmin = GetOldestXmin(sharedRel);
+
+ Assert(TransactionIdIsNormal(*oldestXmin));
+
+ if (vacstmt->freeze)
+ {
+ /* FREEZE option: use oldest Xmin as freeze cutoff too */
+ limit = *oldestXmin;
+ }
+ else
+ {
+ /*
+ * Normal case: freeze cutoff is well in the past, to wit, about
+ * halfway to the wrap horizon
+ */
+ limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
+ }
/*
- * Flush the init file that relcache.c uses to save startup time. The
- * next backend startup will rebuild the init file with up-to-date
- * information from pg_class. This lets the optimizer see the stats
- * that we've collected for certain critical system indexes. See
- * relcache.c for more details.
- *
- * Ignore any failure to unlink the file, since it might not be there if
- * no backend has been started since the last vacuum...
+ * Be careful not to generate a "permanent" XID
*/
- unlink(RELCACHE_INIT_FILENAME);
-
- /* matches the CommitTransaction in PostgresMain() */
- StartTransactionCommand();
+ if (!TransactionIdIsNormal(limit))
+ limit = FirstNormalTransactionId;
/*
- * Clean up working storage --- note we must do this after
- * StartTransactionCommand, else we might be trying to delete
- * the active context!
+ * Ensure sane relationship of limits
*/
- MemoryContextDelete(vac_context);
- vac_context = NULL;
+ if (TransactionIdFollows(limit, *oldestXmin))
+ {
+ ereport(WARNING,
+ (errmsg("oldest Xmin is far in the past"),
+ errhint("Close open transactions soon to avoid wraparound problems.")));
+ limit = *oldestXmin;
+ }
+
+ *freezeLimit = limit;
}
+
/*
- * vac_vacuum() -- vacuum the database.
+ * vac_update_relstats() -- update statistics for one relation
+ *
+ * Update the whole-relation statistics that are kept in its pg_class
+ * row. There are additional stats that will be updated if we are
+ * doing ANALYZE, but we always update these stats. This routine works
+ * for both index and heap relation entries in pg_class.
+ *
+ * We violate no-overwrite semantics here by storing new values for the
+ * statistics columns directly into the pg_class tuple that's already on
+ * the page. The reason for this is that if we updated these tuples in
+ * the usual way, vacuuming pg_class itself wouldn't work very well ---
+ * by the time we got done with a vacuum cycle, most of the tuples in
+ * pg_class would've been obsoleted. Of course, this only works for
+ * fixed-size never-null columns, but these are.
*
- * This routine builds a list of relations to vacuum, and then calls
- * code that vacuums them one at a time. We are careful to vacuum each
- * relation in a separate transaction in order to avoid holding too many
- * locks at one time.
+ * This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
+ * ANALYZE.
*/
-static void
-vac_vacuum(NameData *VacRelP, bool analyze, List *anal_cols2)
+void
+vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
+ bool hasindex)
{
- VRelList vrl,
- cur;
+ Relation rd;
+ HeapTupleData rtup;
+ HeapTuple ctup;
+ Form_pg_class pgcform;
+ Buffer buffer;
- /* get list of relations */
- vrl = getrels(VacRelP);
+ /*
+ * update number of tuples and number of pages in pg_class
+ */
+ rd = heap_openr(RelationRelationName, RowExclusiveLock);
- /* vacuum each heap relation */
- for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
- {
- vacuum_rel(cur->vrl_relid);
- /* analyze separately so locking is minimized */
- if (analyze)
- analyze_rel(cur->vrl_relid, anal_cols2, MESSAGE_LEVEL);
- }
+ ctup = SearchSysCache(RELOID,
+ ObjectIdGetDatum(relid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(ctup))
+ elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+ relid);
+
+ /* get the buffer cache tuple */
+ rtup.t_self = ctup->t_self;
+ ReleaseSysCache(ctup);
+ if (!heap_fetch(rd, SnapshotNow, &rtup, &buffer, false, NULL))
+ elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
+ relid);
+
+ /* overwrite the existing statistics in the tuple */
+ pgcform = (Form_pg_class) GETSTRUCT(&rtup);
+ pgcform->relpages = (int32) num_pages;
+ pgcform->reltuples = num_tuples;
+ pgcform->relhasindex = hasindex;
+
+ /*
+ * If we have discovered that there are no indexes, then there's no
+ * primary key either. This could be done more thoroughly...
+ */
+ if (!hasindex)
+ pgcform->relhaspkey = false;
+
+ /*
+ * Invalidate the tuple in the catcaches; this also arranges to flush
+ * the relation's relcache entry. (If we fail to commit for some
+ * reason, no flush will occur, but no great harm is done since there
+ * are no noncritical state updates here.)
+ */
+ CacheInvalidateHeapTuple(rd, &rtup);
+
+ /* Write the buffer */
+ WriteBuffer(buffer);
+
+ heap_close(rd, RowExclusiveLock);
}
-static VRelList
-getrels(NameData *VacRelP)
+
+/*
+ * vac_update_dbstats() -- update statistics for one database
+ *
+ * Update the whole-database statistics that are kept in its pg_database
+ * row.
+ *
+ * We violate no-overwrite semantics here by storing new values for the
+ * statistics columns directly into the tuple that's already on the page.
+ * As with vac_update_relstats, this avoids leaving dead tuples behind
+ * after a VACUUM; which is good since GetRawDatabaseInfo
+ * can get confused by finding dead tuples in pg_database.
+ *
+ * This routine is shared by full and lazy VACUUM. Note that it is only
+ * applied after a database-wide VACUUM operation.
+ */
+static void
+vac_update_dbstats(Oid dbid,
+ TransactionId vacuumXID,
+ TransactionId frozenXID)
{
- Relation rel;
- TupleDesc tupdesc;
+ Relation relation;
+ ScanKeyData entry[1];
HeapScanDesc scan;
HeapTuple tuple;
- VRelList vrl,
- cur;
- Datum d;
- char *rname;
- char rkind;
- bool n;
- bool found = false;
- ScanKeyData key;
+ Form_pg_database dbform;
- StartTransactionCommand();
+ relation = heap_openr(DatabaseRelationName, RowExclusiveLock);
- if (NameStr(*VacRelP))
- {
+ /* Must use a heap scan, since there's no syscache for pg_database */
+ ScanKeyEntryInitialize(&entry[0], 0x0,
+ ObjectIdAttributeNumber, F_OIDEQ,
+ ObjectIdGetDatum(dbid));
- /*
- * we could use the cache here, but it is clearer to use scankeys
- * for both vacuum cases, bjm 2000/01/19
- */
- char *nontemp_relname;
+ scan = heap_beginscan(relation, SnapshotNow, 1, entry);
- /* We must re-map temp table names bjm 2000-04-06 */
- nontemp_relname = get_temp_rel_by_username(NameStr(*VacRelP));
- if (nontemp_relname == NULL)
- nontemp_relname = NameStr(*VacRelP);
+ tuple = heap_getnext(scan, ForwardScanDirection);
- ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relname,
- F_NAMEEQ,
- PointerGetDatum(nontemp_relname));
- }
- else
- {
- ScanKeyEntryInitialize(&key, 0x0, Anum_pg_class_relkind,
- F_CHAREQ, CharGetDatum('r'));
- }
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "could not find tuple for database %u", dbid);
- vrl = cur = (VRelList) NULL;
+ dbform = (Form_pg_database) GETSTRUCT(tuple);
- rel = heap_openr(RelationRelationName, AccessShareLock);
- tupdesc = RelationGetDescr(rel);
+ /* overwrite the existing statistics in the tuple */
+ dbform->datvacuumxid = vacuumXID;
+ dbform->datfrozenxid = frozenXID;
- scan = heap_beginscan(rel, false, SnapshotNow, 1, &key);
+ /* invalidate the tuple in the cache and write the buffer */
+ CacheInvalidateHeapTuple(relation, tuple);
+ WriteNoReleaseBuffer(scan->rs_cbuf);
- while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
- {
- found = true;
+ heap_endscan(scan);
- d = heap_getattr(tuple, Anum_pg_class_relname, tupdesc, &n);
- rname = (char *) d;
+ heap_close(relation, RowExclusiveLock);
+}
- d = heap_getattr(tuple, Anum_pg_class_relkind, tupdesc, &n);
- rkind = DatumGetChar(d);
+/*
+ * vac_truncate_clog() -- attempt to truncate the commit log
+ *
+ * Scan pg_database to determine the system-wide oldest datvacuumxid,
+ * and use it to truncate the transaction commit log (pg_clog).
+ * Also generate a warning if the system-wide oldest datfrozenxid
+ * seems to be in danger of wrapping around.
+ *
+ * The passed XIDs are simply the ones I just wrote into my pg_database
+ * entry. They're used to initialize the "min" calculations.
+ *
+ * This routine is shared by full and lazy VACUUM. Note that it is only
+ * applied after a database-wide VACUUM operation.
+ */
+static void
+vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
+{
+ TransactionId myXID;
+ Relation relation;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+ int32 age;
+ bool vacuumAlreadyWrapped = false;
+ bool frozenAlreadyWrapped = false;
- if (rkind != RELKIND_RELATION)
- {
- elog(NOTICE, "Vacuum: can not process indices, views and certain system tables");
+ myXID = GetCurrentTransactionId();
+
+ relation = heap_openr(DatabaseRelationName, AccessShareLock);
+
+ scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
+
+ /* Ignore non-connectable databases (eg, template0) */
+ /* It's assumed that these have been frozen correctly */
+ if (!dbform->datallowconn)
continue;
- }
- /* get a relation list entry for this guy */
- if (vrl == (VRelList) NULL)
- vrl = cur = (VRelList)
- MemoryContextAlloc(vac_context, sizeof(VRelListData));
- else
+ if (TransactionIdIsNormal(dbform->datvacuumxid))
{
- cur->vrl_next = (VRelList)
- MemoryContextAlloc(vac_context, sizeof(VRelListData));
- cur = cur->vrl_next;
+ if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
+ vacuumAlreadyWrapped = true;
+ else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
+ vacuumXID = dbform->datvacuumxid;
+ }
+ if (TransactionIdIsNormal(dbform->datfrozenxid))
+ {
+ if (TransactionIdPrecedes(myXID, dbform->datfrozenxid))
+ frozenAlreadyWrapped = true;
+ else if (TransactionIdPrecedes(dbform->datfrozenxid, frozenXID))
+ frozenXID = dbform->datfrozenxid;
}
-
- cur->vrl_relid = tuple->t_data->t_oid;
- cur->vrl_next = (VRelList) NULL;
}
heap_endscan(scan);
- heap_close(rel, AccessShareLock);
- if (!found)
- elog(NOTICE, "Vacuum: table not found");
+ heap_close(relation, AccessShareLock);
- CommitTransactionCommand();
+ /*
+ * Do not truncate CLOG if we seem to have suffered wraparound
+ * already; the computed minimum XID might be bogus.
+ */
+ if (vacuumAlreadyWrapped)
+ {
+ ereport(WARNING,
+ (errmsg("some databases have not been vacuumed in over 2 billion transactions"),
+ errdetail("You may have already suffered transaction-wraparound data loss.")));
+ return;
+ }
- return vrl;
+ /* Truncate CLOG to the oldest vacuumxid */
+ TruncateCLOG(vacuumXID);
+
+ /* Give warning about impending wraparound problems */
+ if (frozenAlreadyWrapped)
+ {
+ ereport(WARNING,
+ (errmsg("some databases have not been vacuumed in over 1 billion transactions"),
+ errhint("Better vacuum them soon, or you may have a wraparound failure.")));
+ }
+ else
+ {
+ age = (int32) (myXID - frozenXID);
+ if (age > (int32) ((MaxTransactionId >> 3) * 3))
+ ereport(WARNING,
+ (errmsg("some databases have not been vacuumed in %d transactions",
+ age),
+ errhint("Better vacuum them within %d transactions, "
+ "or you may have a wraparound failure.",
+ (int32) (MaxTransactionId >> 1) - age)));
+ }
}
+
+/****************************************************************************
+ * *
+ * Code common to both flavors of VACUUM *
+ * *
+ ****************************************************************************
+ */
+
+
/*
* vacuum_rel() -- vacuum one heap relation
*
- * This routine vacuums a single heap, cleans out its indices, and
- * updates its num_pages and num_tuples statistics.
+ * Returns TRUE if we actually processed the relation (or can ignore it
+ * for some reason), FALSE if we failed to process it due to permissions
+ * or other reasons. (A FALSE result really means that some data
+ * may have been left unvacuumed, so we can't update XID stats.)
*
* Doing one heap at a time incurs extra overhead, since we need to
* check that the heap exists again just before we vacuum it. The
*
* At entry and exit, we are not inside a transaction.
*/
-static void
-vacuum_rel(Oid relid)
+static bool
+vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
{
+ LOCKMODE lmode;
Relation onerel;
LockRelId onerelid;
- VacPageListData vacuum_pages; /* List of pages to vacuum and/or clean
- * indices */
- VacPageListData fraged_pages; /* List of pages with space enough for
- * re-using */
- Relation *Irel;
- int32 nindices,
- i;
- VRelStats *vacrelstats;
- bool reindex = false;
Oid toast_relid;
+ bool result;
/* Begin a transaction for vacuuming this relation */
StartTransactionCommand();
+ SetQuerySnapshot(); /* might be needed for functions in
+ * indexes */
/*
* Check for user-requested abort. Note we want this to be inside a
- * transaction, so xact.c doesn't issue useless NOTICE.
+ * transaction, so xact.c doesn't issue useless WARNING.
*/
CHECK_FOR_INTERRUPTS();
0, 0, 0))
{
CommitTransactionCommand();
- return;
+ return true; /* okay 'cause no data there */
}
/*
- * Open the class, get an exclusive lock on it, and check permissions.
+ * Determine the type of lock we want --- hard exclusive lock for a
+ * FULL vacuum, but just ShareUpdateExclusiveLock for concurrent
+ * vacuum. Either way, we can be sure that no other backend is
+ * vacuuming the same table.
+ */
+ lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+
+ /*
+ * Open the class, get an appropriate lock on it, and check
+ * permissions.
+ *
+ * We allow the user to vacuum a table if he is superuser, the table
+ * owner, or the database owner (but in the latter case, only if it's
+ * not a shared relation). pg_class_ownercheck includes the superuser
+ * case.
*
- * Note we choose to treat permissions failure as a NOTICE and keep
+ * Note we choose to treat permissions failure as a WARNING and keep
* trying to vacuum the rest of the DB --- is this appropriate?
*/
- onerel = heap_open(relid, AccessExclusiveLock);
+ onerel = relation_open(relid, lmode);
- if (!pg_ownercheck(GetUserId(), RelationGetRelationName(onerel),
- RELNAME))
+ if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
+ (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
{
- elog(NOTICE, "Skipping \"%s\" --- only table owner can VACUUM it",
- RelationGetRelationName(onerel));
- heap_close(onerel, AccessExclusiveLock);
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- only table or database owner can VACUUM it",
+ RelationGetRelationName(onerel))));
+ relation_close(onerel, lmode);
CommitTransactionCommand();
- return;
+ return false;
}
/*
- * Get a session-level exclusive lock too. This will protect our
- * exclusive access to the relation across multiple transactions,
- * so that we can vacuum the relation's TOAST table (if any) secure
- * in the knowledge that no one is diddling the parent relation.
- *
- * NOTE: this cannot block, even if someone else is waiting for access,
- * because the lock manager knows that both lock requests are from the
- * same process.
+ * Check that it's a plain table; we used to do this in getrels() but
+ * seems safer to check after we've locked the relation.
+ */
+ if (onerel->rd_rel->relkind != expected_relkind)
+ {
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- cannot VACUUM indexes, views or special system tables",
+ RelationGetRelationName(onerel))));
+ relation_close(onerel, lmode);
+ CommitTransactionCommand();
+ return false;
+ }
+
+ /*
+ * Silently ignore tables that are temp tables of other backends ---
+ * trying to vacuum these will lead to great unhappiness, since their
+ * contents are probably not up-to-date on disk. (We don't throw a
+ * warning here; it would just lead to chatter during a database-wide
+ * VACUUM.)
+ */
+ if (isOtherTempNamespace(RelationGetNamespace(onerel)))
+ {
+ relation_close(onerel, lmode);
+ CommitTransactionCommand();
+ return true; /* assume no long-lived data in temp
+ * tables */
+ }
+
+ /*
+ * Get a session-level lock too. This will protect our access to the
+ * relation across multiple transactions, so that we can vacuum the
+ * relation's TOAST table (if any) secure in the knowledge that no one
+ * is deleting the parent relation.
+ *
+ * NOTE: this cannot block, even if someone else is waiting for access,
+ * because the lock manager knows that both lock requests are from the
+ * same process.
+ */
+ onerelid = onerel->rd_lockInfo.lockRelId;
+ LockRelationForSession(&onerelid, lmode);
+
+ /*
+ * Remember the relation's TOAST relation for later
+ */
+ toast_relid = onerel->rd_rel->reltoastrelid;
+
+ /*
+ * Do the actual work --- either FULL or "lazy" vacuum
+ */
+ if (vacstmt->full)
+ full_vacuum_rel(onerel, vacstmt);
+ else
+ lazy_vacuum_rel(onerel, vacstmt);
+
+ result = true; /* did the vacuum */
+
+ /* all done with this class, but hold lock until commit */
+ relation_close(onerel, NoLock);
+
+ /*
+ * Complete the transaction and free all temporary memory used.
+ */
+ CommitTransactionCommand();
+
+ /*
+ * If the relation has a secondary toast rel, vacuum that too while we
+ * still hold the session lock on the master table. Note however that
+ * "analyze" will not get done on the toast table. This is good,
+ * because the toaster always uses hardcoded index access and
+ * statistics are totally unimportant for toast relations.
*/
- onerelid = onerel->rd_lockInfo.lockRelId;
- LockRelationForSession(&onerelid, AccessExclusiveLock);
+ if (toast_relid != InvalidOid)
+ {
+ if (!vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE))
+ result = false; /* failed to vacuum the TOAST table? */
+ }
/*
- * Remember the relation's TOAST relation for later
+ * Now release the session-level lock on the master table.
*/
- toast_relid = onerel->rd_rel->reltoastrelid;
+ UnlockRelationForSession(&onerelid, lmode);
+
+ return result;
+}
+
+
+/****************************************************************************
+ * *
+ * Code for VACUUM FULL (only) *
+ * *
+ ****************************************************************************
+ */
+
+
+/*
+ * full_vacuum_rel() -- perform FULL VACUUM for one heap relation
+ *
+ * This routine vacuums a single heap, cleans out its indexes, and
+ * updates its num_pages and num_tuples statistics.
+ *
+ * At entry, we have already established a transaction and opened
+ * and locked the relation.
+ */
+static void
+full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
+{
+ VacPageListData vacuum_pages; /* List of pages to vacuum and/or
+ * clean indexes */
+ VacPageListData fraged_pages; /* List of pages with space enough
+ * for re-using */
+ Relation *Irel;
+ int nindexes,
+ i;
+ VRelStats *vacrelstats;
+
+ vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
+ &OldestXmin, &FreezeLimit);
/*
* Set up statistics-gathering machinery.
*/
vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
- vacrelstats->relid = relid;
- vacrelstats->num_pages = vacrelstats->num_tuples = 0;
+ vacrelstats->rel_pages = 0;
+ vacrelstats->rel_tuples = 0;
vacrelstats->hasindex = false;
- GetXmaxRecent(&XmaxRecent);
-
- /* scan it */
- reindex = false;
+ /* scan the heap */
vacuum_pages.num_pages = fraged_pages.num_pages = 0;
scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
- if (IsIgnoringSystemIndexes() &&
- IsSystemRelationName(RelationGetRelationName(onerel)))
- reindex = true;
-
- /* Now open indices */
- nindices = 0;
- Irel = (Relation *) NULL;
- get_indices(onerel, &nindices, &Irel);
- if (!Irel)
- reindex = false;
- else if (!RelationGetForm(onerel)->relhasindex)
- reindex = true;
- if (nindices > 0)
+
+ /* Now open all indexes of the relation */
+ vac_open_indexes(onerel, &nindexes, &Irel);
+ if (nindexes > 0)
vacrelstats->hasindex = true;
- else
- vacrelstats->hasindex = false;
- if (reindex)
- {
- for (i = 0; i < nindices; i++)
- index_close(Irel[i]);
- Irel = (Relation *) NULL;
- activate_indexes_of_a_table(relid, false);
- }
/* Clean/scan index relation(s) */
if (Irel != (Relation *) NULL)
{
if (vacuum_pages.num_pages > 0)
{
- for (i = 0; i < nindices; i++)
+ for (i = 0; i < nindexes; i++)
vacuum_index(&vacuum_pages, Irel[i],
- vacrelstats->num_tuples, 0);
+ vacrelstats->rel_tuples, 0);
}
else
{
- /* just scan indices to update statistic */
- for (i = 0; i < nindices; i++)
- scan_index(Irel[i], vacrelstats->num_tuples);
+ /* just scan indexes to update statistic */
+ for (i = 0; i < nindexes; i++)
+ scan_index(Irel[i], vacrelstats->rel_tuples);
}
}
{
/* Try to shrink heap */
repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
- nindices, Irel);
+ nindexes, Irel);
+ vac_close_indexes(nindexes, Irel);
}
else
{
- if (Irel != (Relation *) NULL)
- close_indices(nindices, Irel);
+ vac_close_indexes(nindexes, Irel);
if (vacuum_pages.num_pages > 0)
{
/* Clean pages from vacuum_pages list */
* tuples have correct on-row commit status on disk (see
* bufmgr.c's comments for FlushRelationBuffers()).
*/
- i = FlushRelationBuffers(onerel, vacrelstats->num_pages);
+ i = FlushRelationBuffers(onerel, vacrelstats->rel_pages);
if (i < 0)
- elog(ERROR, "VACUUM (vacuum_rel): FlushRelationBuffers returned %d",
- i);
+ elog(ERROR, "FlushRelationBuffers returned %d", i);
}
}
- if (reindex)
- activate_indexes_of_a_table(relid, true);
- /* all done with this class, but hold lock until commit */
- heap_close(onerel, NoLock);
+ /* update shared free space map with final free space info */
+ vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);
/* update statistics in pg_class */
- update_relstats(vacrelstats->relid, vacrelstats->num_pages,
- vacrelstats->num_tuples, vacrelstats->hasindex,
- vacrelstats);
-
- /*
- * Complete the transaction and free all temporary memory used.
- */
- CommitTransactionCommand();
-
- /*
- * If the relation has a secondary toast one, vacuum that too
- * while we still hold the session lock on the master table.
- * We don't need to propagate "analyze" to it, because the toaster
- * always uses hardcoded index access and statistics are
- * totally unimportant for toast relations
- */
- if (toast_relid != InvalidOid)
- vacuum_rel(toast_relid);
-
- /*
- * Now release the session-level lock on the master table.
- */
- UnlockRelationForSession(&onerelid, AccessExclusiveLock);
+ vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
+ vacrelstats->rel_tuples, vacrelstats->hasindex);
}
+
/*
* scan_heap() -- scan an open heap relation
*
- * This routine sets commit times, constructs vacuum_pages list of
- * empty/uninitialized pages and pages with dead tuples and
- * ~LP_USED line pointers, constructs fraged_pages list of pages
- * appropriate for purposes of shrinking and maintains statistics
- * on the number of live tuples in a heap.
+ * This routine sets commit status bits, constructs vacuum_pages (list
+ * of pages we need to compact free space on and/or clean indexes of
+ * deleted tuples), constructs fraged_pages (list of pages with free
+ * space that tuples could be moved into), and calculates statistics
+ * on the number of live tuples in the heap.
*/
static void
scan_heap(VRelStats *vacrelstats, Relation onerel,
- VacPageList vacuum_pages, VacPageList fraged_pages)
+ VacPageList vacuum_pages, VacPageList fraged_pages)
{
BlockNumber nblocks,
blkno;
ItemId itemid;
Buffer buf;
HeapTupleData tuple;
- Page page,
- tempPage = NULL;
OffsetNumber offnum,
maxoff;
bool pgchanged,
tupgone,
- dobufrel,
notup;
char *relname;
VacPage vacpage,
- vp;
- uint32 tups_vacuumed,
- num_tuples,
- nkeep,
- nunused,
- ncrash,
- empty_pages,
- new_pages,
- changed_pages,
+ vacpagecopy;
+ BlockNumber empty_pages,
empty_end_pages;
- Size free_size,
- usable_free_size;
+ double num_tuples,
+ tups_vacuumed,
+ nkeep,
+ nunused;
+ double free_space,
+ usable_free_space;
Size min_tlen = MaxTupleSize;
Size max_tlen = 0;
- int32 i;
+ int i;
bool do_shrinking = true;
VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
int num_vtlinks = 0;
int free_vtlinks = 100;
- struct rusage ru0;
+ VacRUsage ru0;
- getrusage(RUSAGE_SELF, &ru0);
+ vac_init_rusage(&ru0);
relname = RelationGetRelationName(onerel);
- elog(MESSAGE_LEVEL, "--Relation %s--", relname);
+ ereport(elevel,
+ (errmsg("vacuuming \"%s.%s\"",
+ get_namespace_name(RelationGetNamespace(onerel)),
+ relname)));
- tups_vacuumed = num_tuples = nkeep = nunused = ncrash = empty_pages =
- new_pages = changed_pages = empty_end_pages = 0;
- free_size = usable_free_size = 0;
+ empty_pages = empty_end_pages = 0;
+ num_tuples = tups_vacuumed = nkeep = nunused = 0;
+ free_space = 0;
nblocks = RelationGetNumberOfBlocks(onerel);
+ /*
+ * We initially create each VacPage item in a maximal-sized workspace,
+ * then copy the workspace into a just-large-enough copy.
+ */
vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
- vacpage->offsets_used = 0;
for (blkno = 0; blkno < nblocks; blkno++)
{
+ Page page,
+ tempPage = NULL;
+ bool do_reap,
+ do_frag;
+
+ CHECK_FOR_INTERRUPTS();
+
buf = ReadBuffer(onerel, blkno);
page = BufferGetPage(buf);
+
vacpage->blkno = blkno;
+ vacpage->offsets_used = 0;
vacpage->offsets_free = 0;
if (PageIsNew(page))
{
- elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
- relname, blkno);
+ ereport(WARNING,
+ (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
+ relname, blkno)));
PageInit(page, BufferGetPageSize(buf), 0);
vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
- free_size += (vacpage->free - sizeof(ItemIdData));
- new_pages++;
+ free_space += vacpage->free;
+ empty_pages++;
empty_end_pages++;
- reap_page(vacuum_pages, vacpage);
+ vacpagecopy = copy_vac_page(vacpage);
+ vpage_insert(vacuum_pages, vacpagecopy);
+ vpage_insert(fraged_pages, vacpagecopy);
WriteBuffer(buf);
continue;
}
if (PageIsEmpty(page))
{
vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
- free_size += (vacpage->free - sizeof(ItemIdData));
+ free_space += vacpage->free;
empty_pages++;
empty_end_pages++;
- reap_page(vacuum_pages, vacpage);
+ vacpagecopy = copy_vac_page(vacpage);
+ vpage_insert(vacuum_pages, vacpagecopy);
+ vpage_insert(fraged_pages, vacpagecopy);
ReleaseBuffer(buf);
continue;
}
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
+ uint16 sv_infomask;
+
itemid = PageGetItemId(page, offnum);
/*
- * Collect un-used items too - it's possible to have indices
+ * Collect un-used items too - it's possible to have indexes
* pointing here after crash.
*/
if (!ItemIdIsUsed(itemid))
{
vacpage->offsets[vacpage->offsets_free++] = offnum;
- nunused++;
+ nunused += 1;
continue;
}
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
tuple.t_len = ItemIdGetLength(itemid);
ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
tupgone = false;
+ sv_infomask = tuple.t_data->t_infomask;
- if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+ switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
{
- if (tuple.t_data->t_infomask & HEAP_XMIN_INVALID)
- tupgone = true;
- else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
- {
- if (TransactionIdDidCommit((TransactionId)
- tuple.t_data->t_cmin))
- {
- tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
- pgchanged = true;
- tupgone = true;
- }
- else
- {
- tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
- pgchanged = true;
- }
- }
- else if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
- {
- if (!TransactionIdDidCommit((TransactionId)
- tuple.t_data->t_cmin))
- {
- tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
- pgchanged = true;
- tupgone = true;
- }
- else
- {
- tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
- pgchanged = true;
- }
- }
- else
- {
- if (TransactionIdDidAbort(tuple.t_data->t_xmin))
- tupgone = true;
- else if (TransactionIdDidCommit(tuple.t_data->t_xmin))
- {
- tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
- pgchanged = true;
- }
- else if (!TransactionIdIsInProgress(tuple.t_data->t_xmin))
- {
-
- /*
- * Not Aborted, Not Committed, Not in Progress -
- * so it's from crashed process. - vadim 11/26/96
- */
- ncrash++;
- tupgone = true;
- }
- else
- {
- elog(NOTICE, "Rel %s: TID %u/%u: InsertTransactionInProgress %u - can't shrink relation",
- relname, blkno, offnum, tuple.t_data->t_xmin);
- do_shrinking = false;
- }
- }
- }
+ case HEAPTUPLE_DEAD:
+ tupgone = true; /* we can delete the tuple */
+ break;
+ case HEAPTUPLE_LIVE:
- /*
- * here we are concerned about tuples with xmin committed and
- * xmax unknown or committed
- */
- if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED &&
- !(tuple.t_data->t_infomask & HEAP_XMAX_INVALID))
- {
- if (tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED)
- {
- if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
- {
- tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
- tuple.t_data->t_infomask &=
- ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
- pgchanged = true;
- }
- else
- tupgone = true;
- }
- else if (TransactionIdDidAbort(tuple.t_data->t_xmax))
- {
- tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
- pgchanged = true;
- }
- else if (TransactionIdDidCommit(tuple.t_data->t_xmax))
- {
- if (tuple.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
+ /*
+ * Tuple is good. Consider whether to replace its
+ * xmin value with FrozenTransactionId.
+ */
+ if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
+ TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+ FreezeLimit))
{
- tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
- tuple.t_data->t_infomask &=
- ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
+ HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
+ /* infomask should be okay already */
+ Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
pgchanged = true;
}
- else
- tupgone = true;
- }
- else if (!TransactionIdIsInProgress(tuple.t_data->t_xmax))
- {
+ break;
+ case HEAPTUPLE_RECENTLY_DEAD:
/*
- * Not Aborted, Not Committed, Not in Progress - so it
- * from crashed process. - vadim 06/02/97
+ * If tuple is recently deleted then we must not
+ * remove it from relation.
*/
- tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
- tuple.t_data->t_infomask &=
- ~(HEAP_XMAX_COMMITTED | HEAP_MARKED_FOR_UPDATE);
- pgchanged = true;
- }
- else
- {
- elog(NOTICE, "Rel %s: TID %u/%u: DeleteTransactionInProgress %u - can't shrink relation",
- relname, blkno, offnum, tuple.t_data->t_xmax);
- do_shrinking = false;
- }
-
- /*
- * If tuple is recently deleted then we must not remove it
- * from relation.
- */
- if (tupgone && (tuple.t_data->t_infomask & HEAP_XMIN_INVALID) == 0 && tuple.t_data->t_xmax >= XmaxRecent)
- {
- tupgone = false;
- nkeep++;
- if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED))
- {
- tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- pgchanged = true;
- }
+ nkeep += 1;
/*
* If we do shrinking and this tuple is updated one
* then remember it to construct updated tuple
* dependencies.
*/
- if (do_shrinking && !(ItemPointerEquals(&(tuple.t_self),
- &(tuple.t_data->t_ctid))))
+ if (do_shrinking &&
+ !(ItemPointerEquals(&(tuple.t_self),
+ &(tuple.t_data->t_ctid))))
{
if (free_vtlinks == 0)
{
free_vtlinks--;
num_vtlinks++;
}
- }
+ break;
+ case HEAPTUPLE_INSERT_IN_PROGRESS:
+
+ /*
+ * This should not happen, since we hold exclusive
+ * lock on the relation; shouldn't we raise an error?
+ * (Actually, it can happen in system catalogs, since
+ * we tend to release write lock before commit there.)
+ */
+ ereport(NOTICE,
+ (errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
+ relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
+ do_shrinking = false;
+ break;
+ case HEAPTUPLE_DELETE_IN_PROGRESS:
+
+ /*
+ * This should not happen, since we hold exclusive
+ * lock on the relation; shouldn't we raise an error?
+ * (Actually, it can happen in system catalogs, since
+ * we tend to release write lock before commit there.)
+ */
+ ereport(NOTICE,
+ (errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
+ relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
+ do_shrinking = false;
+ break;
+ default:
+ elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+ break;
}
+ /* check for hint-bit update by HeapTupleSatisfiesVacuum */
+ if (sv_infomask != tuple.t_data->t_infomask)
+ pgchanged = true;
+
/*
* Other checks...
*/
- if (!OidIsValid(tuple.t_data->t_oid))
- {
- elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
- relname, blkno, offnum, tupgone);
- }
+ if (onerel->rd_rel->relhasoids &&
+ !OidIsValid(HeapTupleGetOid(&tuple)))
+ elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
+ relname, blkno, offnum);
if (tupgone)
{
* dead tuples removed. Below we will apply
* PageRepairFragmentation to the copy, so that we can
* determine how much space will be available after
- * removal of dead tuples. But note we are NOT changing
+ * removal of dead tuples. But note we are NOT changing
* the real page yet...
*/
if (tempPage == (Page) NULL)
pageSize = PageGetPageSize(page);
tempPage = (Page) palloc(pageSize);
- memmove(tempPage, page, pageSize);
+ memcpy(tempPage, page, pageSize);
}
/* mark it unused on the temp page */
- lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
+ lpp = PageGetItemId(tempPage, offnum);
lpp->lp_flags &= ~LP_USED;
vacpage->offsets[vacpage->offsets_free++] = offnum;
- tups_vacuumed++;
+ tups_vacuumed += 1;
}
else
{
- num_tuples++;
+ num_tuples += 1;
notup = false;
if (tuple.t_len < min_tlen)
min_tlen = tuple.t_len;
if (tuple.t_len > max_tlen)
max_tlen = tuple.t_len;
}
- }
-
- if (pgchanged)
- {
- WriteBuffer(buf);
- dobufrel = false;
- changed_pages++;
- }
- else
- dobufrel = true;
+ } /* scan along page */
if (tempPage != (Page) NULL)
- { /* Some tuples are gone */
+ {
+ /* Some tuples are removable; figure free space after removal */
PageRepairFragmentation(tempPage, NULL);
vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
- free_size += vacpage->free;
- reap_page(vacuum_pages, vacpage);
pfree(tempPage);
- tempPage = (Page) NULL;
+ do_reap = true;
}
- else if (vacpage->offsets_free > 0)
- { /* there are only ~LP_USED line pointers */
+ else
+ {
+ /* Just use current available space */
vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
- free_size += vacpage->free;
- reap_page(vacuum_pages, vacpage);
+ /* Need to reap the page if it has ~LP_USED line pointers */
+ do_reap = (vacpage->offsets_free > 0);
}
- if (dobufrel)
- ReleaseBuffer(buf);
+
+ free_space += vacpage->free;
+
+ /*
+ * Add the page to fraged_pages if it has a useful amount of free
+ * space. "Useful" means enough for a minimal-sized tuple. But we
+ * don't know that accurately near the start of the relation, so
+ * add pages unconditionally if they have >= BLCKSZ/10 free space.
+ */
+ do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
+
+ if (do_reap || do_frag)
+ {
+ vacpagecopy = copy_vac_page(vacpage);
+ if (do_reap)
+ vpage_insert(vacuum_pages, vacpagecopy);
+ if (do_frag)
+ vpage_insert(fraged_pages, vacpagecopy);
+ }
+
+ /*
+ * Include the page in empty_end_pages if it will be empty after
+ * vacuuming; this is to keep us from using it as a move
+ * destination.
+ */
if (notup)
+ {
+ empty_pages++;
empty_end_pages++;
+ }
else
empty_end_pages = 0;
+
+ if (pgchanged)
+ WriteBuffer(buf);
+ else
+ ReleaseBuffer(buf);
}
pfree(vacpage);
/* save stats in the rel list for use later */
- vacrelstats->num_tuples = num_tuples;
- vacrelstats->num_pages = nblocks;
-/* vacrelstats->natts = attr_cnt;*/
+ vacrelstats->rel_tuples = num_tuples;
+ vacrelstats->rel_pages = nblocks;
if (num_tuples == 0)
min_tlen = max_tlen = 0;
vacrelstats->min_tlen = min_tlen;
fraged_pages->empty_end_pages = empty_end_pages;
/*
- * Try to make fraged_pages keeping in mind that we can't use free
- * space of "empty" end-pages and last page if it reaped.
+ * Clear the fraged_pages list if we found we couldn't shrink. Else,
+ * remove any "empty" end-pages from the list, and compute usable free
+ * space = free space in remaining pages.
*/
- if (do_shrinking && vacuum_pages->num_pages - empty_end_pages > 0)
+ if (do_shrinking)
{
- int nusf; /* blocks usefull for re-using */
-
- nusf = vacuum_pages->num_pages - empty_end_pages;
- if ((vacuum_pages->pagedesc[nusf - 1])->blkno == nblocks - empty_end_pages - 1)
- nusf--;
-
- for (i = 0; i < nusf; i++)
- {
- vp = vacuum_pages->pagedesc[i];
- if (enough_space(vp, min_tlen))
- {
- vpage_insert(fraged_pages, vp);
- usable_free_size += vp->free;
- }
- }
+ Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
+ fraged_pages->num_pages -= empty_end_pages;
+ usable_free_space = 0;
+ for (i = 0; i < fraged_pages->num_pages; i++)
+ usable_free_space += fraged_pages->pagedesc[i]->free;
+ }
+ else
+ {
+ fraged_pages->num_pages = 0;
+ usable_free_space = 0;
}
- if (usable_free_size > 0 && num_vtlinks > 0)
+ /* don't bother to save vtlinks if we will not call repair_frag */
+ if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
{
qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
vac_cmp_vtlinks);
pfree(vtlinks);
}
- elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
-Tup %u: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %lu, MaxLen %lu; \
-Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s",
- nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
- new_pages, num_tuples, tups_vacuumed,
- nkeep, vacrelstats->num_vtlinks, ncrash,
- nunused, (unsigned long)min_tlen, (unsigned long)max_tlen,
- (unsigned long)free_size, (unsigned long)usable_free_size,
- empty_end_pages, fraged_pages->num_pages,
- show_rusage(&ru0));
-
+ ereport(elevel,
+ (errmsg("\"%s\": found %.0f removable, %.0f nonremovable tuples in %u pages",
+ RelationGetRelationName(onerel),
+ tups_vacuumed, num_tuples, nblocks),
+ errdetail("%.0f dead tuples cannot be removed yet.\n"
+ "Nonremovable tuples range from %lu to %lu bytes long.\n"
+ "There were %.0f unused item pointers.\n"
+ "Total free space (including removable tuples) is %.0f bytes.\n"
+ "%u pages are or will become empty, including %u at the end of the table.\n"
+ "%u pages containing %.0f free bytes are potential move destinations.\n"
+ "%s",
+ nkeep,
+ (unsigned long) min_tlen, (unsigned long) max_tlen,
+ nunused,
+ free_space,
+ empty_pages, empty_end_pages,
+ fraged_pages->num_pages, usable_free_space,
+ vac_show_rusage(&ru0))));
}
* repair_frag() -- try to repair relation's fragmentation
*
* This routine marks dead tuples as unused and tries re-use dead space
- * by moving tuples (and inserting indices if needed). It constructs
- * Nvacpagelist list of free-ed pages (moved tuples) and clean indices
+ * by moving tuples (and inserting indexes if needed). It constructs
+ * Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
* for them after committing (in hack-manner - without losing locks
* and freeing memory!) current transaction. It truncates relation
* if some end-blocks are gone away.
*/
static void
repair_frag(VRelStats *vacrelstats, Relation onerel,
- VacPageList vacuum_pages, VacPageList fraged_pages,
- int nindices, Relation *Irel)
+ VacPageList vacuum_pages, VacPageList fraged_pages,
+ int nindexes, Relation *Irel)
{
TransactionId myXID;
CommandId myCID;
Buffer buf,
cur_buffer;
- int nblocks,
+ BlockNumber nblocks,
blkno;
+ BlockNumber last_move_dest_block = 0,
+ last_vacuum_block;
Page page,
ToPage = NULL;
- OffsetNumber offnum = 0,
- maxoff = 0,
+ OffsetNumber offnum,
+ maxoff,
newoff,
max_offset;
ItemId itemid,
HeapTupleData tuple,
newtup;
TupleDesc tupdesc;
- IndexInfo **indexInfo = NULL;
- Datum idatum[INDEX_MAX_KEYS];
- char inulls[INDEX_MAX_KEYS];
- InsertIndexResult iresult;
+ ResultRelInfo *resultRelInfo;
+ EState *estate;
+ TupleTable tupleTable;
+ TupleTableSlot *slot;
VacPageListData Nvacpagelist;
VacPage cur_page = NULL,
last_vacuum_page,
vacpage,
*curpage;
int cur_item = 0;
- int last_move_dest_block = -1,
- last_vacuum_block,
- i = 0;
+ int i;
Size tuple_len;
int num_moved,
num_fraged_pages,
bool isempty,
dowrite,
chain_tuple_moved;
- struct rusage ru0;
+ VacRUsage ru0;
- getrusage(RUSAGE_SELF, &ru0);
+ vac_init_rusage(&ru0);
myXID = GetCurrentTransactionId();
myCID = GetCurrentCommandId();
tupdesc = RelationGetDescr(onerel);
- if (Irel != (Relation *) NULL) /* preparation for index' inserts */
- indexInfo = get_index_desc(onerel, nindices, Irel);
+ /*
+ * We need a ResultRelInfo and an EState so we can use the regular
+ * executor's index-entry-making machinery.
+ */
+ estate = CreateExecutorState();
+
+ resultRelInfo = makeNode(ResultRelInfo);
+ resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
+ resultRelInfo->ri_RelationDesc = onerel;
+ resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */
+
+ ExecOpenIndices(resultRelInfo);
+
+ estate->es_result_relations = resultRelInfo;
+ estate->es_num_result_relations = 1;
+ estate->es_result_relation_info = resultRelInfo;
+
+ /* Set up a dummy tuple table too */
+ tupleTable = ExecCreateTupleTable(1);
+ slot = ExecAllocTableSlot(tupleTable);
+ ExecSetSlotDescriptor(slot, tupdesc, false);
Nvacpagelist.num_pages = 0;
num_fraged_pages = fraged_pages->num_pages;
- Assert(vacuum_pages->num_pages > vacuum_pages->empty_end_pages);
+ Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
- last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
- last_vacuum_block = last_vacuum_page->blkno;
+ if (vacuumed_pages > 0)
+ {
+ /* get last reaped page from vacuum_pages */
+ last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
+ last_vacuum_block = last_vacuum_page->blkno;
+ }
+ else
+ {
+ last_vacuum_page = NULL;
+ last_vacuum_block = InvalidBlockNumber;
+ }
cur_buffer = InvalidBuffer;
num_moved = 0;
/*
* Scan pages backwards from the last nonempty page, trying to move
* tuples down to lower pages. Quit when we reach a page that we have
- * moved any tuples onto. Note that if a page is still in the
- * fraged_pages list (list of candidate move-target pages) when we
- * reach it, we will remove it from the list. This ensures we never
- * move a tuple up to a higher page number.
+ * moved any tuples onto, or the first page if we haven't moved
+ * anything, or when we find a page we cannot completely empty (this
+ * last condition is handled by "break" statements within the loop).
*
* NB: this code depends on the vacuum_pages and fraged_pages lists being
- * in order, and on fraged_pages being a subset of vacuum_pages.
+ * in order by blkno.
*/
- nblocks = vacrelstats->num_pages;
+ nblocks = vacrelstats->rel_pages;
for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
blkno > last_move_dest_block;
blkno--)
{
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Forget fraged_pages pages at or after this one; they're no
+ * longer useful as move targets, since we only want to move down.
+ * Note that since we stop the outer loop at last_move_dest_block,
+ * pages removed here cannot have had anything moved onto them
+ * already.
+ *
+ * Also note that we don't change the stored fraged_pages list, only
+ * our local variable num_fraged_pages; so the forgotten pages are
+ * still available to be loaded into the free space map later.
+ */
+ while (num_fraged_pages > 0 &&
+ fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
+ {
+ Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
+ --num_fraged_pages;
+ }
+
+ /*
+ * Process this page of relation.
+ */
buf = ReadBuffer(onerel, blkno);
page = BufferGetPage(buf);
isempty = PageIsEmpty(page);
dowrite = false;
- if (blkno == last_vacuum_block) /* it's reaped page */
+
+ /* Is the page in the vacuum_pages list? */
+ if (blkno == last_vacuum_block)
{
- if (last_vacuum_page->offsets_free > 0) /* there are dead tuples */
- { /* on this page - clean */
+ if (last_vacuum_page->offsets_free > 0)
+ {
+ /* there are dead tuples on this page - clean them */
Assert(!isempty);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
vacuum_page(onerel, buf, last_vacuum_page);
else
{
last_vacuum_page = NULL;
- last_vacuum_block = -1;
- }
- if (num_fraged_pages > 0 &&
- fraged_pages->pagedesc[num_fraged_pages - 1]->blkno ==
- (BlockNumber) blkno)
- {
- /* page is in fraged_pages too; remove it */
- --num_fraged_pages;
+ last_vacuum_block = InvalidBlockNumber;
}
if (isempty)
{
if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin");
if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
elog(ERROR, "HEAP_MOVED_IN was not expected");
/*
* If this (chain) tuple is moved by me already then I
- * have to check is it in vacpage or not - i.e. is it moved
- * while cleaning this page or some previous one.
+ * have to check is it in vacpage or not - i.e. is it
+ * moved while cleaning this page or some previous one.
*/
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
{
+ if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+ elog(ERROR, "invalid XVAC in tuple header");
if (keep_tuples == 0)
continue;
if (chain_tuple_moved) /* some chains was moved
* If this tuple is in the chain of tuples created in updates
* by "recent" transactions then we have to move all chain of
* tuples to another places.
+ *
+ * NOTE: this test is not 100% accurate: it is possible for a
+ * tuple to be an updated one with recent xmin, and yet not
+ * have a corresponding tuple in the vtlinks list. Presumably
+ * there was once a parent tuple with xmax matching the xmin,
+ * but it's possible that that tuple has been removed --- for
+ * example, if it had xmin = xmax then
+ * HeapTupleSatisfiesVacuum would deem it removable as soon as
+ * the xmin xact completes.
+ *
+ * To be on the safe side, we abandon the repair_frag process if
+ * we cannot find the parent tuple in vtlinks. This may be
+ * overly conservative; AFAICS it would be safe to move the
+ * chain.
*/
- if ((tuple.t_data->t_infomask & HEAP_UPDATED &&
- tuple.t_data->t_xmin >= XmaxRecent) ||
- (!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID) &&
- !(ItemPointerEquals(&(tuple.t_self), &(tuple.t_data->t_ctid)))))
+ if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
+ !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
+ OldestXmin)) ||
+ (!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_MARKED_FOR_UPDATE)) &&
+ !(ItemPointerEquals(&(tuple.t_self),
+ &(tuple.t_data->t_ctid)))))
{
Buffer Cbuf = buf;
+ bool freeCbuf = false;
+ bool chain_move_failed = false;
Page Cpage;
ItemId Citemid;
ItemPointerData Ctid;
HeapTupleData tp = tuple;
Size tlen = tuple_len;
- VTupleMove vtmove = (VTupleMove)
- palloc(100 * sizeof(VTupleMoveData));
- int num_vtmove = 0;
- int free_vtmove = 100;
+ VTupleMove vtmove;
+ int num_vtmove;
+ int free_vtmove;
VacPage to_vacpage = NULL;
int to_item = 0;
- bool freeCbuf = false;
int ti;
- if (vacrelstats->vtlinks == NULL)
- elog(ERROR, "No one parent tuple was found");
if (cur_buffer != InvalidBuffer)
{
WriteBuffer(cur_buffer);
cur_buffer = InvalidBuffer;
}
+ /* Quick exit if we have no vtlinks to search in */
+ if (vacrelstats->vtlinks == NULL)
+ {
+ elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
+ break; /* out of walk-along-page loop */
+ }
+
+ vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
+ num_vtmove = 0;
+ free_vtmove = 100;
+
/*
* If this tuple is in the begin/middle of the chain then
* we have to move to the end of chain.
*/
- while (!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) &&
- !(ItemPointerEquals(&(tp.t_self), &(tp.t_data->t_ctid))))
+ while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_MARKED_FOR_UPDATE)) &&
+ !(ItemPointerEquals(&(tp.t_self),
+ &(tp.t_data->t_ctid))))
{
Ctid = tp.t_data->t_ctid;
if (freeCbuf)
ItemPointerGetOffsetNumber(&Ctid));
if (!ItemIdIsUsed(Citemid))
{
-
/*
* This means that in the middle of chain there
- * was tuple updated by older (than XmaxRecent)
+ * was tuple updated by older (than OldestXmin)
* xaction and this tuple is already deleted by
* me. Actually, upper part of chain should be
* removed and seems that this should be handled
- * in scan_heap(), but it's not implemented at
- * the moment and so we just stop shrinking here.
+ * in scan_heap(), but it's not implemented at the
+ * moment and so we just stop shrinking here.
*/
- ReleaseBuffer(Cbuf);
- pfree(vtmove);
- vtmove = NULL;
- elog(NOTICE, "Child itemid in update-chain marked as unused - can't continue repair_frag");
- break;
+ elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
+ chain_move_failed = true;
+ break; /* out of loop to move to chain end */
}
tp.t_datamcxt = NULL;
tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
tp.t_self = Ctid;
tlen = tp.t_len = ItemIdGetLength(Citemid);
}
- if (vtmove == NULL)
- break;
- /* first, can chain be moved ? */
+ if (chain_move_failed)
+ {
+ if (freeCbuf)
+ ReleaseBuffer(Cbuf);
+ pfree(vtmove);
+ break; /* out of walk-along-page loop */
+ }
+
+ /*
+ * Check if all items in chain can be moved
+ */
for (;;)
{
+ Buffer Pbuf;
+ Page Ppage;
+ ItemId Pitemid;
+ HeapTupleData Ptp;
+ VTupleLinkData vtld,
+ *vtlp;
+
if (to_vacpage == NULL ||
!enough_space(to_vacpage, tlen))
{
-
- /*
- * if to_vacpage no longer has enough free space to be
- * useful, remove it from fraged_pages list
- */
- if (to_vacpage != NULL &&
- !enough_space(to_vacpage, vacrelstats->min_tlen))
- {
- Assert(num_fraged_pages > to_item);
- memmove(fraged_pages->pagedesc + to_item,
- fraged_pages->pagedesc + to_item + 1,
- sizeof(VacPage) * (num_fraged_pages - to_item - 1));
- num_fraged_pages--;
- }
for (i = 0; i < num_fraged_pages; i++)
{
if (enough_space(fraged_pages->pagedesc[i], tlen))
break;
}
- /* can't move item anywhere */
if (i == num_fraged_pages)
{
- for (i = 0; i < num_vtmove; i++)
- {
- Assert(vtmove[i].vacpage->offsets_used > 0);
- (vtmove[i].vacpage->offsets_used)--;
- }
- num_vtmove = 0;
- break;
+ /* can't move item anywhere */
+ chain_move_failed = true;
+ break; /* out of check-all-items loop */
}
to_item = i;
to_vacpage = fraged_pages->pagedesc[to_item];
}
to_vacpage->free -= MAXALIGN(tlen);
if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
- to_vacpage->free -= MAXALIGN(sizeof(ItemIdData));
+ to_vacpage->free -= sizeof(ItemIdData);
(to_vacpage->offsets_used)++;
if (free_vtmove == 0)
{
free_vtmove = 1000;
- vtmove = (VTupleMove) repalloc(vtmove,
- (free_vtmove + num_vtmove) *
- sizeof(VTupleMoveData));
+ vtmove = (VTupleMove)
+ repalloc(vtmove,
+ (free_vtmove + num_vtmove) *
+ sizeof(VTupleMoveData));
}
vtmove[num_vtmove].tid = tp.t_self;
vtmove[num_vtmove].vacpage = to_vacpage;
free_vtmove--;
num_vtmove++;
- /* All done ? */
+ /* At beginning of chain? */
if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
- tp.t_data->t_xmin < XmaxRecent)
+ TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
+ OldestXmin))
break;
- /* Well, try to find tuple with old row version */
- for (;;)
+ /* No, move to tuple with prior row version */
+ vtld.new_tid = tp.t_self;
+ vtlp = (VTupleLink)
+ vac_bsearch((void *) &vtld,
+ (void *) (vacrelstats->vtlinks),
+ vacrelstats->num_vtlinks,
+ sizeof(VTupleLinkData),
+ vac_cmp_vtlinks);
+ if (vtlp == NULL)
{
- Buffer Pbuf;
- Page Ppage;
- ItemId Pitemid;
- HeapTupleData Ptp;
- VTupleLinkData vtld,
- *vtlp;
-
- vtld.new_tid = tp.t_self;
- vtlp = (VTupleLink)
- vac_find_eq((void *) (vacrelstats->vtlinks),
- vacrelstats->num_vtlinks,
- sizeof(VTupleLinkData),
- (void *) &vtld,
- vac_cmp_vtlinks);
- if (vtlp == NULL)
- elog(ERROR, "Parent tuple was not found");
- tp.t_self = vtlp->this_tid;
- Pbuf = ReadBuffer(onerel,
+ /* see discussion above */
+ elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
+ chain_move_failed = true;
+ break; /* out of check-all-items loop */
+ }
+ tp.t_self = vtlp->this_tid;
+ Pbuf = ReadBuffer(onerel,
ItemPointerGetBlockNumber(&(tp.t_self)));
- Ppage = BufferGetPage(Pbuf);
- Pitemid = PageGetItemId(Ppage,
+ Ppage = BufferGetPage(Pbuf);
+ Pitemid = PageGetItemId(Ppage,
ItemPointerGetOffsetNumber(&(tp.t_self)));
- if (!ItemIdIsUsed(Pitemid))
- elog(ERROR, "Parent itemid marked as unused");
- Ptp.t_datamcxt = NULL;
- Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
- Assert(ItemPointerEquals(&(vtld.new_tid),
- &(Ptp.t_data->t_ctid)));
+ /* this can't happen since we saw tuple earlier: */
+ if (!ItemIdIsUsed(Pitemid))
+ elog(ERROR, "parent itemid marked as unused");
+ Ptp.t_datamcxt = NULL;
+ Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
- /*
- * Read above about cases when
- * !ItemIdIsUsed(Citemid) (child item is
- * removed)... Due to the fact that at the moment
- * we don't remove unuseful part of update-chain,
- * it's possible to get too old parent row here.
- * Like as in the case which caused this problem,
- * we stop shrinking here. I could try to find
- * real parent row but want not to do it because
- * of real solution will be implemented anyway,
- * latter, and we are too close to 6.5 release. -
- * vadim 06/11/99
- */
- if (Ptp.t_data->t_xmax != tp.t_data->t_xmin)
- {
- if (freeCbuf)
- ReleaseBuffer(Cbuf);
- freeCbuf = false;
- ReleaseBuffer(Pbuf);
- for (i = 0; i < num_vtmove; i++)
- {
- Assert(vtmove[i].vacpage->offsets_used > 0);
- (vtmove[i].vacpage->offsets_used)--;
- }
- num_vtmove = 0;
- elog(NOTICE, "Too old parent tuple found - can't continue repair_frag");
- break;
- }
-#ifdef NOT_USED /* I'm not sure that this will wotk
- * properly... */
+ /* ctid should not have changed since we saved it */
+ Assert(ItemPointerEquals(&(vtld.new_tid),
+ &(Ptp.t_data->t_ctid)));
- /*
- * If this tuple is updated version of row and it
- * was created by the same transaction then no one
- * is interested in this tuple - mark it as
- * removed.
- */
- if (Ptp.t_data->t_infomask & HEAP_UPDATED &&
- Ptp.t_data->t_xmin == Ptp.t_data->t_xmax)
- {
- TransactionIdStore(myXID,
- (TransactionId *) &(Ptp.t_data->t_cmin));
- Ptp.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
- Ptp.t_data->t_infomask |= HEAP_MOVED_OFF;
- WriteBuffer(Pbuf);
- continue;
- }
-#endif
- tp.t_datamcxt = Ptp.t_datamcxt;
- tp.t_data = Ptp.t_data;
- tlen = tp.t_len = ItemIdGetLength(Pitemid);
- if (freeCbuf)
- ReleaseBuffer(Cbuf);
- Cbuf = Pbuf;
- freeCbuf = true;
- break;
+ /*
+ * Read above about cases when !ItemIdIsUsed(Citemid)
+ * (child item is removed)... Due to the fact that at
+ * the moment we don't remove unuseful part of
+ * update-chain, it's possible to get too old parent
+ * row here. Like as in the case which caused this
+ * problem, we stop shrinking here. I could try to
+ * find real parent row but want not to do it because
+ * of real solution will be implemented anyway, later,
+ * and we are too close to 6.5 release. - vadim
+ * 06/11/99
+ */
+ if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
+ HeapTupleHeaderGetXmin(tp.t_data))))
+ {
+ ReleaseBuffer(Pbuf);
+ elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
+ chain_move_failed = true;
+ break; /* out of check-all-items loop */
}
- if (num_vtmove == 0)
- break;
- }
+ tp.t_datamcxt = Ptp.t_datamcxt;
+ tp.t_data = Ptp.t_data;
+ tlen = tp.t_len = ItemIdGetLength(Pitemid);
+ if (freeCbuf)
+ ReleaseBuffer(Cbuf);
+ Cbuf = Pbuf;
+ freeCbuf = true;
+ } /* end of check-all-items loop */
+
if (freeCbuf)
ReleaseBuffer(Cbuf);
- if (num_vtmove == 0) /* chain can't be moved */
+ freeCbuf = false;
+
+ if (chain_move_failed)
{
+ /*
+ * Undo changes to offsets_used state. We don't
+ * bother cleaning up the amount-free state, since
+ * we're not going to do any further tuple motion.
+ */
+ for (i = 0; i < num_vtmove; i++)
+ {
+ Assert(vtmove[i].vacpage->offsets_used > 0);
+ (vtmove[i].vacpage->offsets_used)--;
+ }
pfree(vtmove);
- break;
+ break; /* out of walk-along-page loop */
}
+
+ /*
+ * Okay, move the whle tuple chain
+ */
ItemPointerSetInvalid(&Ctid);
for (ti = 0; ti < num_vtmove; ti++)
{
- VacPage destvacpage = vtmove[ti].vacpage;
+ VacPage destvacpage = vtmove[ti].vacpage;
/* Get page to move from */
tuple.t_self = vtmove[ti].tid;
*/
heap_copytuple_with_tuple(&tuple, &newtup);
- RelationInvalidateHeapTuple(onerel, &tuple);
+ /*
+ * register invalidation of source tuple in catcaches.
+ */
+ CacheInvalidateHeapTuple(onerel, &tuple);
- /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
+ /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
START_CRIT_SECTION();
- TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
- tuple.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
+ tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+ HEAP_XMIN_INVALID |
+ HEAP_MOVED_IN);
tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
+ HeapTupleHeaderSetXvac(tuple.t_data, myXID);
/*
* If this page was not used before - clean it.
*
* NOTE: a nasty bug used to lurk here. It is possible
* for the source and destination pages to be the same
- * (since this tuple-chain member can be on a page lower
- * than the one we're currently processing in the outer
- * loop). If that's true, then after vacuum_page() the
- * source tuple will have been moved, and tuple.t_data
- * will be pointing at garbage. Therefore we must do
- * everything that uses tuple.t_data BEFORE this step!!
+ * (since this tuple-chain member can be on a page
+ * lower than the one we're currently processing in
+ * the outer loop). If that's true, then after
+ * vacuum_page() the source tuple will have been
+ * moved, and tuple.t_data will be pointing at
+ * garbage. Therefore we must do everything that uses
+ * tuple.t_data BEFORE this step!!
*
* This path is different from the other callers of
- * vacuum_page, because we have already incremented the
- * vacpage's offsets_used field to account for the
+ * vacuum_page, because we have already incremented
+ * the vacpage's offsets_used field to account for the
* tuple(s) we expect to move onto the page. Therefore
- * vacuum_page's check for offsets_used == 0 is
- * wrong. But since that's a good debugging check for
- * all other callers, we work around it here rather
- * than remove it.
+ * vacuum_page's check for offsets_used == 0 is wrong.
+ * But since that's a good debugging check for all
+ * other callers, we work around it here rather than
+ * remove it.
*/
if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
{
* Update the state of the copied tuple, and store it
* on the destination page.
*/
- TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
- newtup.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
+ newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+ HEAP_XMIN_INVALID |
+ HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
- newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
- InvalidOffsetNumber, LP_USED);
+ HeapTupleHeaderSetXvac(newtup.t_data, myXID);
+ newoff = PageAddItem(ToPage,
+ (Item) newtup.t_data,
+ tuple_len,
+ InvalidOffsetNumber,
+ LP_USED);
if (newoff == InvalidOffsetNumber)
{
- elog(STOP, "moving chain: failed to add item with len = %lu to page %u",
- (unsigned long)tuple_len, destvacpage->blkno);
+ elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
+ (unsigned long) tuple_len, destvacpage->blkno);
}
newitemid = PageGetItemId(ToPage, newoff);
pfree(newtup.t_data);
newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
+ /* XLOG stuff */
+ if (!onerel->rd_istemp)
{
- XLogRecPtr recptr =
- log_heap_move(onerel, Cbuf, tuple.t_self,
- cur_buffer, &newtup);
+ XLogRecPtr recptr =
+ log_heap_move(onerel, Cbuf, tuple.t_self,
+ cur_buffer, &newtup);
if (Cbuf != cur_buffer)
{
PageSetLSN(ToPage, recptr);
PageSetSUI(ToPage, ThisStartUpID);
}
+ else
+ {
+ /*
+ * No XLOG record, but still need to flag that XID
+ * exists on disk
+ */
+ MyXactMadeTempRelUpdate = true;
+ }
+
END_CRIT_SECTION();
- if (((int) destvacpage->blkno) > last_move_dest_block)
+ if (destvacpage->blkno > last_move_dest_block)
last_move_dest_block = destvacpage->blkno;
/*
* Set new tuple's t_ctid pointing to itself for last
- * tuple in chain, and to next tuple in chain otherwise.
+ * tuple in chain, and to next tuple in chain
+ * otherwise.
*/
if (!ItemPointerIsValid(&Ctid))
newtup.t_data->t_ctid = newtup.t_self;
if (cur_buffer != Cbuf)
LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
- if (Irel != (Relation *) NULL)
+ /* Create index entries for the moved tuple */
+ if (resultRelInfo->ri_NumIndices > 0)
{
- /*
- * XXX using CurrentMemoryContext here means
- * intra-vacuum memory leak for functional indexes.
- * Should fix someday.
- *
- * XXX This code fails to handle partial indexes!
- * Probably should change it to use ExecOpenIndices.
- */
- for (i = 0; i < nindices; i++)
- {
- FormIndexDatum(indexInfo[i],
- &newtup,
- tupdesc,
- CurrentMemoryContext,
- idatum,
- inulls);
- iresult = index_insert(Irel[i],
- idatum,
- inulls,
- &newtup.t_self,
- onerel);
- if (iresult)
- pfree(iresult);
- }
+ ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
+ ExecInsertIndexTuples(slot, &(newtup.t_self),
+ estate, true);
}
+
WriteBuffer(cur_buffer);
WriteBuffer(Cbuf);
- }
+ } /* end of move-the-tuple-chain loop */
+
cur_buffer = InvalidBuffer;
pfree(vtmove);
chain_tuple_moved = true;
+
+ /* advance to next tuple in walk-along-page loop */
continue;
- }
+ } /* end of is-tuple-in-chain test */
/* try to find new page for this tuple */
if (cur_buffer == InvalidBuffer ||
{
WriteBuffer(cur_buffer);
cur_buffer = InvalidBuffer;
-
- /*
- * If previous target page is now too full to add *any*
- * tuple to it, remove it from fraged_pages.
- */
- if (!enough_space(cur_page, vacrelstats->min_tlen))
- {
- Assert(num_fraged_pages > cur_item);
- memmove(fraged_pages->pagedesc + cur_item,
- fraged_pages->pagedesc + cur_item + 1,
- sizeof(VacPage) * (num_fraged_pages - cur_item - 1));
- num_fraged_pages--;
- }
}
for (i = 0; i < num_fraged_pages; i++)
{
/* copy tuple */
heap_copytuple_with_tuple(&tuple, &newtup);
- RelationInvalidateHeapTuple(onerel, &tuple);
+ /*
+ * register invalidation of source tuple in catcaches.
+ *
+ * (Note: we do not need to register the copied tuple, because we
+ * are not changing the tuple contents and so there cannot be
+ * any need to flush negative catcache entries.)
+ */
+ CacheInvalidateHeapTuple(onerel, &tuple);
- /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */
+ /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
START_CRIT_SECTION();
/*
- * Mark new tuple as moved_in by vacuum and store vacuum XID
- * in t_cmin !!!
+ * Mark new tuple as MOVED_IN by me.
*/
- TransactionIdStore(myXID, (TransactionId *) &(newtup.t_data->t_cmin));
- newtup.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_OFF);
+ newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+ HEAP_XMIN_INVALID |
+ HEAP_MOVED_OFF);
newtup.t_data->t_infomask |= HEAP_MOVED_IN;
+ HeapTupleHeaderSetXvac(newtup.t_data, myXID);
/* add tuple to the page */
newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
InvalidOffsetNumber, LP_USED);
if (newoff == InvalidOffsetNumber)
{
- elog(STOP, "\
-failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
- (unsigned long)tuple_len, cur_page->blkno, (unsigned long)cur_page->free,
- cur_page->offsets_used, cur_page->offsets_free);
+ elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
+ (unsigned long) tuple_len,
+ cur_page->blkno, (unsigned long) cur_page->free,
+ cur_page->offsets_used, cur_page->offsets_free);
}
newitemid = PageGetItemId(ToPage, newoff);
pfree(newtup.t_data);
newtup.t_self = newtup.t_data->t_ctid;
/*
- * Mark old tuple as moved_off by vacuum and store vacuum XID
- * in t_cmin !!!
+ * Mark old tuple as MOVED_OFF by me.
*/
- TransactionIdStore(myXID, (TransactionId *) &(tuple.t_data->t_cmin));
- tuple.t_data->t_infomask &=
- ~(HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID | HEAP_MOVED_IN);
+ tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
+ HEAP_XMIN_INVALID |
+ HEAP_MOVED_IN);
tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
+ HeapTupleHeaderSetXvac(tuple.t_data, myXID);
+ /* XLOG stuff */
+ if (!onerel->rd_istemp)
{
- XLogRecPtr recptr =
- log_heap_move(onerel, buf, tuple.t_self,
- cur_buffer, &newtup);
+ XLogRecPtr recptr =
+ log_heap_move(onerel, buf, tuple.t_self,
+ cur_buffer, &newtup);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
PageSetLSN(ToPage, recptr);
PageSetSUI(ToPage, ThisStartUpID);
}
+ else
+ {
+ /*
+ * No XLOG record, but still need to flag that XID exists
+ * on disk
+ */
+ MyXactMadeTempRelUpdate = true;
+ }
+
END_CRIT_SECTION();
cur_page->offsets_used++;
num_moved++;
cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
- if (((int) cur_page->blkno) > last_move_dest_block)
+ if (cur_page->blkno > last_move_dest_block)
last_move_dest_block = cur_page->blkno;
vacpage->offsets[vacpage->offsets_free++] = offnum;
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
/* insert index' tuples if needed */
- if (Irel != (Relation *) NULL)
+ if (resultRelInfo->ri_NumIndices > 0)
{
- /*
- * XXX using CurrentMemoryContext here means
- * intra-vacuum memory leak for functional indexes.
- * Should fix someday.
- *
- * XXX This code fails to handle partial indexes!
- * Probably should change it to use ExecOpenIndices.
- */
- for (i = 0; i < nindices; i++)
- {
- FormIndexDatum(indexInfo[i],
- &newtup,
- tupdesc,
- CurrentMemoryContext,
- idatum,
- inulls);
- iresult = index_insert(Irel[i],
- idatum,
- inulls,
- &newtup.t_self,
- onerel);
- if (iresult)
- pfree(iresult);
- }
+ ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
+ ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
}
-
} /* walk along page */
+ /*
+ * If we broke out of the walk-along-page loop early (ie, still
+ * have offnum <= maxoff), then we failed to move some tuple off
+ * this page. No point in shrinking any more, so clean up and
+ * exit the per-page loop.
+ */
if (offnum < maxoff && keep_tuples > 0)
{
OffsetNumber off;
+ /*
+ * Fix vacpage state for any unvisited tuples remaining on
+ * page
+ */
for (off = OffsetNumberNext(offnum);
off <= maxoff;
off = OffsetNumberNext(off))
tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
continue;
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin (4)");
if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
- elog(ERROR, "HEAP_MOVED_IN was not expected (2)");
+ elog(ERROR, "HEAP_MOVED_IN was not expected");
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
{
+ if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+ elog(ERROR, "invalid XVAC in tuple header");
/* some chains was moved while */
if (chain_tuple_moved)
{ /* cleaning this page */
keep_tuples--;
}
}
+ else
+ elog(ERROR, "HEAP_MOVED_OFF was expected");
}
}
qsort((char *) (vacpage->offsets), vacpage->offsets_free,
sizeof(OffsetNumber), vac_cmp_offno);
}
- reap_page(&Nvacpagelist, vacpage);
+ vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
WriteBuffer(buf);
}
else if (dowrite)
ReleaseBuffer(buf);
if (offnum <= maxoff)
- break; /* some item(s) left */
+ break; /* had to quit early, see above note */
} /* walk along relation */
* relation. Ideally we should do Commit/StartTransactionCommand
* here, relying on the session-level table lock to protect our
* exclusive access to the relation. However, that would require
- * a lot of extra code to close and re-open the relation, indices,
- * etc. For now, a quick hack: record status of current transaction
- * as committed, and continue.
+ * a lot of extra code to close and re-open the relation, indexes,
+ * etc. For now, a quick hack: record status of current
+ * transaction as committed, and continue.
*/
RecordTransactionCommit();
}
/*
- * Clean uncleaned reaped pages from vacuum_pages list list and set
- * xmin committed for inserted tuples
+ * We are not going to move any more tuples across pages, but we still
+ * need to apply vacuum_page to compact free space in the remaining
+ * pages in vacuum_pages list. Note that some of these pages may also
+ * be in the fraged_pages list, and may have had tuples moved onto
+ * them; if so, we already did vacuum_page and needn't do it again.
*/
- checked_moved = 0;
- for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++)
+ for (i = 0, curpage = vacuum_pages->pagedesc;
+ i < vacuumed_pages;
+ i++, curpage++)
{
- Assert((*curpage)->blkno < (BlockNumber) blkno);
- buf = ReadBuffer(onerel, (*curpage)->blkno);
- LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
- page = BufferGetPage(buf);
- if ((*curpage)->offsets_used == 0) /* this page was not used */
+ CHECK_FOR_INTERRUPTS();
+ Assert((*curpage)->blkno < blkno);
+ if ((*curpage)->offsets_used == 0)
{
+ /* this page was not used as a move target, so must clean it */
+ buf = ReadBuffer(onerel, (*curpage)->blkno);
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buf);
if (!PageIsEmpty(page))
vacuum_page(onerel, buf, *curpage);
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(buf);
}
- else
-/* this page was used */
+ }
+
+ /*
+ * Now scan all the pages that we moved tuples onto and update tuple
+ * status bits. This is not really necessary, but will save time for
+ * future transactions examining these tuples.
+ *
+ * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
+ * pages that were move source pages but not move dest pages. One
+ * also wonders whether it wouldn't be better to skip this step and
+ * let the tuple status updates happen someplace that's not holding an
+ * exclusive lock on the relation.
+ */
+ checked_moved = 0;
+ for (i = 0, curpage = fraged_pages->pagedesc;
+ i < num_fraged_pages;
+ i++, curpage++)
+ {
+ CHECK_FOR_INTERRUPTS();
+ Assert((*curpage)->blkno < blkno);
+ if ((*curpage)->blkno > last_move_dest_block)
+ break; /* no need to scan any further */
+ if ((*curpage)->offsets_used == 0)
+ continue; /* this page was never used as a move dest */
+ buf = ReadBuffer(onerel, (*curpage)->blkno);
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buf);
+ num_tuples = 0;
+ max_offset = PageGetMaxOffsetNumber(page);
+ for (newoff = FirstOffsetNumber;
+ newoff <= max_offset;
+ newoff = OffsetNumberNext(newoff))
{
- num_tuples = 0;
- max_offset = PageGetMaxOffsetNumber(page);
- for (newoff = FirstOffsetNumber;
- newoff <= max_offset;
- newoff = OffsetNumberNext(newoff))
+ itemid = PageGetItemId(page, newoff);
+ if (!ItemIdIsUsed(itemid))
+ continue;
+ tuple.t_datamcxt = NULL;
+ tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+ if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
- itemid = PageGetItemId(page, newoff);
- if (!ItemIdIsUsed(itemid))
- continue;
- tuple.t_datamcxt = NULL;
- tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
- if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+ if (!(tuple.t_data->t_infomask & HEAP_MOVED))
+ elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+ if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+ elog(ERROR, "invalid XVAC in tuple header");
+ if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
{
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin (2)");
- if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
- {
- tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
- num_tuples++;
- }
- else if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
- tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
- else
- elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
+ tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
+ tuple.t_data->t_infomask &= ~HEAP_MOVED;
+ num_tuples++;
}
+ else
+ tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
}
- Assert((*curpage)->offsets_used == num_tuples);
- checked_moved += num_tuples;
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
+ Assert((*curpage)->offsets_used == num_tuples);
+ checked_moved += num_tuples;
}
Assert(num_moved == checked_moved);
- elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u; Tuple(s) moved: %u. %s",
- RelationGetRelationName(onerel),
- nblocks, blkno, num_moved,
- show_rusage(&ru0));
+ /*
+ * It'd be cleaner to make this report at the bottom of this routine,
+ * but then the rusage would double-count the second pass of index
+ * vacuuming. So do it here and ignore the relatively small amount of
+ * processing that occurs below.
+ */
+ ereport(elevel,
+ (errmsg("\"%s\": moved %u tuples, truncated %u to %u pages",
+ RelationGetRelationName(onerel),
+ num_moved, nblocks, blkno),
+ errdetail("%s",
+ vac_show_rusage(&ru0))));
- /*
+ /*
* Reflect the motion of system tuples to catalog cache here.
*/
CommandCounterIncrement();
if (Nvacpagelist.num_pages > 0)
{
- /* vacuum indices again if needed */
+ /* vacuum indexes again if needed */
if (Irel != (Relation *) NULL)
{
- VacPage *vpleft,
+ VacPage *vpleft,
*vpright,
vpsave;
/* re-sort Nvacpagelist.pagedesc */
for (vpleft = Nvacpagelist.pagedesc,
- vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
+ vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
vpleft < vpright; vpleft++, vpright--)
{
vpsave = *vpleft;
*vpright = vpsave;
}
Assert(keep_tuples >= 0);
- for (i = 0; i < nindices; i++)
+ for (i = 0; i < nindexes; i++)
vacuum_index(&Nvacpagelist, Irel[i],
- vacrelstats->num_tuples, keep_tuples);
+ vacrelstats->rel_tuples, keep_tuples);
}
/* clean moved tuples from last page in Nvacpagelist list */
- if (vacpage->blkno == (BlockNumber) (blkno - 1) &&
+ if (vacpage->blkno == (blkno - 1) &&
vacpage->offsets_free > 0)
{
- char unbuf[BLCKSZ];
- OffsetNumber *unused = (OffsetNumber*)unbuf;
- int uncnt;
+ OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
+ int uncnt;
buf = ReadBuffer(onerel, vacpage->blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
- START_CRIT_SECTION();
page = BufferGetPage(buf);
num_tuples = 0;
+ maxoff = PageGetMaxOffsetNumber(page);
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
{
- if ((TransactionId) tuple.t_data->t_cmin != myXID)
- elog(ERROR, "Invalid XID in t_cmin (3)");
if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
{
+ if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
+ elog(ERROR, "invalid XVAC in tuple header");
itemid->lp_flags &= ~LP_USED;
num_tuples++;
}
else
- elog(ERROR, "HEAP_MOVED_OFF was expected (2)");
+ elog(ERROR, "HEAP_MOVED_OFF was expected");
}
}
Assert(vacpage->offsets_free == num_tuples);
+
+ START_CRIT_SECTION();
+
uncnt = PageRepairFragmentation(page, unused);
+
+ /* XLOG stuff */
+ if (!onerel->rd_istemp)
{
XLogRecPtr recptr;
- recptr = log_heap_clean(onerel, buf, (char*)unused,
- (char*)(&(unused[uncnt])) - (char*)unused);
+
+ recptr = log_heap_clean(onerel, buf, unused, uncnt);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
+ else
+ {
+ /*
+ * No XLOG record, but still need to flag that XID exists
+ * on disk
+ */
+ MyXactMadeTempRelUpdate = true;
+ }
+
END_CRIT_SECTION();
+
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
}
/*
* Flush dirty pages out to disk. We do this unconditionally, even if
- * we don't need to truncate, because we want to ensure that all tuples
- * have correct on-row commit status on disk (see bufmgr.c's comments
- * for FlushRelationBuffers()).
+ * we don't need to truncate, because we want to ensure that all
+ * tuples have correct on-row commit status on disk (see bufmgr.c's
+ * comments for FlushRelationBuffers()).
*/
i = FlushRelationBuffers(onerel, blkno);
if (i < 0)
- elog(ERROR, "VACUUM (repair_frag): FlushRelationBuffers returned %d",
- i);
+ elog(ERROR, "FlushRelationBuffers returned %d", i);
/* truncate relation, if needed */
if (blkno < nblocks)
{
blkno = smgrtruncate(DEFAULT_SMGR, onerel, blkno);
- Assert(blkno >= 0);
- vacrelstats->num_pages = blkno; /* set new number of blocks */
- }
-
- if (Irel != (Relation *) NULL) /* pfree index' allocations */
- {
- close_indices(nindices, Irel);
- pfree(indexInfo);
+ onerel->rd_nblocks = blkno; /* update relcache immediately */
+ onerel->rd_targblock = InvalidBlockNumber;
+ vacrelstats->rel_pages = blkno; /* set new number of blocks */
}
+ /* clean up */
pfree(vacpage);
if (vacrelstats->vtlinks != NULL)
pfree(vacrelstats->vtlinks);
+
+ ExecDropTupleTable(tupleTable, true);
+
+ ExecCloseIndices(resultRelInfo);
+
+ FreeExecutorState(estate);
}
/*
{
Buffer buf;
VacPage *vacpage;
+ BlockNumber relblocks;
int nblocks;
int i;
nblocks = vacuum_pages->num_pages;
- nblocks -= vacuum_pages->empty_end_pages; /* nothing to do with
- * them */
+ nblocks -= vacuum_pages->empty_end_pages; /* nothing to do with them */
for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
{
+ CHECK_FOR_INTERRUPTS();
if ((*vacpage)->offsets_free > 0)
{
buf = ReadBuffer(onerel, (*vacpage)->blkno);
/*
* Flush dirty pages out to disk. We do this unconditionally, even if
- * we don't need to truncate, because we want to ensure that all tuples
- * have correct on-row commit status on disk (see bufmgr.c's comments
- * for FlushRelationBuffers()).
+ * we don't need to truncate, because we want to ensure that all
+ * tuples have correct on-row commit status on disk (see bufmgr.c's
+ * comments for FlushRelationBuffers()).
*/
- Assert(vacrelstats->num_pages >= vacuum_pages->empty_end_pages);
- nblocks = vacrelstats->num_pages - vacuum_pages->empty_end_pages;
+ Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
+ relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
- i = FlushRelationBuffers(onerel, nblocks);
+ i = FlushRelationBuffers(onerel, relblocks);
if (i < 0)
- elog(ERROR, "VACUUM (vacuum_heap): FlushRelationBuffers returned %d",
- i);
+ elog(ERROR, "FlushRelationBuffers returned %d", i);
/* truncate relation if there are some empty end-pages */
if (vacuum_pages->empty_end_pages > 0)
{
- elog(MESSAGE_LEVEL, "Rel %s: Pages: %u --> %u.",
- RelationGetRelationName(onerel),
- vacrelstats->num_pages, nblocks);
- nblocks = smgrtruncate(DEFAULT_SMGR, onerel, nblocks);
- Assert(nblocks >= 0);
- vacrelstats->num_pages = nblocks; /* set new number of blocks */
+ ereport(elevel,
+ (errmsg("\"%s\": truncated %u to %u pages",
+ RelationGetRelationName(onerel),
+ vacrelstats->rel_pages, relblocks)));
+ relblocks = smgrtruncate(DEFAULT_SMGR, onerel, relblocks);
+ onerel->rd_nblocks = relblocks; /* update relcache immediately */
+ onerel->rd_targblock = InvalidBlockNumber;
+ vacrelstats->rel_pages = relblocks; /* set new number of
+ * blocks */
}
}
static void
vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
{
- char unbuf[BLCKSZ];
- OffsetNumber *unused = (OffsetNumber*)unbuf;
- int uncnt;
- Page page = BufferGetPage(buffer);
- ItemId itemid;
- int i;
+ OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
+ int uncnt;
+ Page page = BufferGetPage(buffer);
+ ItemId itemid;
+ int i;
/* There shouldn't be any tuples moved onto the page yet! */
Assert(vacpage->offsets_used == 0);
START_CRIT_SECTION();
+
for (i = 0; i < vacpage->offsets_free; i++)
{
- itemid = &(((PageHeader) page)->pd_linp[vacpage->offsets[i] - 1]);
+ itemid = PageGetItemId(page, vacpage->offsets[i]);
itemid->lp_flags &= ~LP_USED;
}
+
uncnt = PageRepairFragmentation(page, unused);
+
+ /* XLOG stuff */
+ if (!onerel->rd_istemp)
{
XLogRecPtr recptr;
- recptr = log_heap_clean(onerel, buffer, (char*)unused,
- (char*)(&(unused[uncnt])) - (char*)unused);
+
+ recptr = log_heap_clean(onerel, buffer, unused, uncnt);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
+ else
+ {
+ /* No XLOG record, but still need to flag that XID exists on disk */
+ MyXactMadeTempRelUpdate = true;
+ }
+
END_CRIT_SECTION();
}
/*
- * _scan_index() -- scan one index relation to update statistic.
+ * scan_index() -- scan one index relation to update statistic.
*
+ * We use this when we have no deletions to do.
*/
static void
-scan_index(Relation indrel, int num_tuples)
+scan_index(Relation indrel, double num_tuples)
{
- RetrieveIndexResult res;
- IndexScanDesc iscan;
- int nitups;
- int nipages;
- struct rusage ru0;
+ IndexBulkDeleteResult *stats;
+ IndexVacuumCleanupInfo vcinfo;
+ VacRUsage ru0;
+
+ vac_init_rusage(&ru0);
- getrusage(RUSAGE_SELF, &ru0);
+ /*
+ * Even though we're not planning to delete anything, we use the
+ * ambulkdelete call, because (a) the scan happens within the index AM
+ * for more speed, and (b) it may want to pass private statistics to
+ * the amvacuumcleanup call.
+ */
+ stats = index_bulk_delete(indrel, dummy_tid_reaped, NULL);
- /* walk through the entire index */
- iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
- nitups = 0;
+ /* Do post-VACUUM cleanup, even though we deleted nothing */
+ vcinfo.vacuum_full = true;
+ vcinfo.message_level = elevel;
- while ((res = index_getnext(iscan, ForwardScanDirection))
- != (RetrieveIndexResult) NULL)
- {
- nitups++;
- pfree(res);
- }
+ stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
- index_endscan(iscan);
+ if (!stats)
+ return;
/* now update statistics in pg_class */
- nipages = RelationGetNumberOfBlocks(indrel);
- update_relstats(RelationGetRelid(indrel), nipages, nitups, false, NULL);
-
- elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u. %s",
- RelationGetRelationName(indrel), nipages, nitups,
- show_rusage(&ru0));
+ vac_update_relstats(RelationGetRelid(indrel),
+ stats->num_pages, stats->num_index_tuples,
+ false);
+
+ ereport(elevel,
+ (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples,
+ stats->num_pages),
+ errdetail("%u index pages have been deleted, %u are currently reusable.\n"
+ "%s",
+ stats->pages_deleted, stats->pages_free,
+ vac_show_rusage(&ru0))));
- if (nitups != num_tuples)
- elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u).\
-\n\tRecreate the index.",
- RelationGetRelationName(indrel), nitups, num_tuples);
+ /*
+ * Check for tuple count mismatch. If the index is partial, then it's
+ * OK for it to have fewer tuples than the heap; else we got trouble.
+ */
+ if (stats->num_index_tuples != num_tuples)
+ {
+ if (stats->num_index_tuples > num_tuples ||
+ !vac_is_partial_index(indrel))
+ ereport(WARNING,
+ (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples, num_tuples),
+ errhint("Rebuild the index with REINDEX.")));
+ }
+ pfree(stats);
}
/*
*
* Vpl is the VacPageList of the heap we're currently vacuuming.
* It's locked. Indrel is an index relation on the vacuumed heap.
- * We don't set locks on the index relation here, since the indexed
- * access methods support locking at different granularities.
- * We let them handle it.
+ *
+ * We don't bother to set locks on the index relation here, since
+ * the parent table is exclusive-locked already.
*
* Finally, we arrange to update the index relation's statistics in
* pg_class.
*/
static void
-vacuum_index(VacPageList vacpagelist, Relation indrel, int num_tuples, int keep_tuples)
+vacuum_index(VacPageList vacpagelist, Relation indrel,
+ double num_tuples, int keep_tuples)
{
- RetrieveIndexResult res;
- IndexScanDesc iscan;
- ItemPointer heapptr;
- int tups_vacuumed;
- int num_index_tuples;
- int num_pages;
- VacPage vp;
- struct rusage ru0;
-
- getrusage(RUSAGE_SELF, &ru0);
-
- /* walk through the entire index */
- iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
- tups_vacuumed = 0;
- num_index_tuples = 0;
-
- while ((res = index_getnext(iscan, ForwardScanDirection))
- != (RetrieveIndexResult) NULL)
- {
- heapptr = &res->heap_iptr;
+ IndexBulkDeleteResult *stats;
+ IndexVacuumCleanupInfo vcinfo;
+ VacRUsage ru0;
- if ((vp = tid_reaped(heapptr, vacpagelist)) != (VacPage) NULL)
- {
-#ifdef NOT_USED
- elog(DEBUG, "<%x,%x> -> <%x,%x>",
- ItemPointerGetBlockNumber(&(res->index_iptr)),
- ItemPointerGetOffsetNumber(&(res->index_iptr)),
- ItemPointerGetBlockNumber(&(res->heap_iptr)),
- ItemPointerGetOffsetNumber(&(res->heap_iptr)));
-#endif
- if (vp->offsets_free == 0)
- {
- elog(NOTICE, "Index %s: pointer to EmptyPage (blk %u off %u) - fixing",
- RelationGetRelationName(indrel),
- vp->blkno, ItemPointerGetOffsetNumber(heapptr));
- }
- ++tups_vacuumed;
- index_delete(indrel, &res->index_iptr);
- }
- else
- num_index_tuples++;
+ vac_init_rusage(&ru0);
- pfree(res);
- }
+ /* Do bulk deletion */
+ stats = index_bulk_delete(indrel, tid_reaped, (void *) vacpagelist);
- index_endscan(iscan);
+ /* Do post-VACUUM cleanup */
+ vcinfo.vacuum_full = true;
+ vcinfo.message_level = elevel;
- /* now update statistics in pg_class */
- num_pages = RelationGetNumberOfBlocks(indrel);
- update_relstats(RelationGetRelid(indrel), num_pages, num_index_tuples, false, NULL);
+ stats = index_vacuum_cleanup(indrel, &vcinfo, stats);
+
+ if (!stats)
+ return;
- elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %u: Deleted %u. %s",
- RelationGetRelationName(indrel), num_pages,
- num_index_tuples - keep_tuples, tups_vacuumed,
- show_rusage(&ru0));
+ /* now update statistics in pg_class */
+ vac_update_relstats(RelationGetRelid(indrel),
+ stats->num_pages, stats->num_index_tuples,
+ false);
+
+ ereport(elevel,
+ (errmsg("index \"%s\" now contains %.0f tuples in %u pages",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples,
+ stats->num_pages),
+ errdetail("%.0f index tuples were removed.\n"
+ "%u index pages have been deleted, %u are currently reusable.\n"
+ "%s",
+ stats->tuples_removed,
+ stats->pages_deleted, stats->pages_free,
+ vac_show_rusage(&ru0))));
- if (num_index_tuples != num_tuples + keep_tuples)
- elog(NOTICE, "Index %s: NUMBER OF INDEX' TUPLES (%u) IS NOT THE SAME AS HEAP' (%u).\
-\n\tRecreate the index.",
- RelationGetRelationName(indrel), num_index_tuples, num_tuples);
+ /*
+ * Check for tuple count mismatch. If the index is partial, then it's
+ * OK for it to have fewer tuples than the heap; else we got trouble.
+ */
+ if (stats->num_index_tuples != num_tuples + keep_tuples)
+ {
+ if (stats->num_index_tuples > num_tuples + keep_tuples ||
+ !vac_is_partial_index(indrel))
+ ereport(WARNING,
+ (errmsg("index \"%s\" contains %.0f tuples, but table contains %.0f tuples",
+ RelationGetRelationName(indrel),
+ stats->num_index_tuples, num_tuples + keep_tuples),
+ errhint("Rebuild the index with REINDEX.")));
+ }
+ pfree(stats);
}
/*
* tid_reaped() -- is a particular tid reaped?
*
+ * This has the right signature to be an IndexBulkDeleteCallback.
+ *
* vacpagelist->VacPage_array is sorted in right order.
*/
-static VacPage
-tid_reaped(ItemPointer itemptr, VacPageList vacpagelist)
+static bool
+tid_reaped(ItemPointer itemptr, void *state)
{
+ VacPageList vacpagelist = (VacPageList) state;
OffsetNumber ioffno;
OffsetNumber *voff;
VacPage vp,
ioffno = ItemPointerGetOffsetNumber(itemptr);
vp = &vacpage;
- vpp = (VacPage *) vac_find_eq((void *) (vacpagelist->pagedesc),
- vacpagelist->num_pages, sizeof(VacPage), (void *) &vp,
- vac_cmp_blk);
+ vpp = (VacPage *) vac_bsearch((void *) &vp,
+ (void *) (vacpagelist->pagedesc),
+ vacpagelist->num_pages,
+ sizeof(VacPage),
+ vac_cmp_blk);
- if (vpp == (VacPage *) NULL)
- return (VacPage) NULL;
- vp = *vpp;
+ if (vpp == NULL)
+ return false;
- /* ok - we are on true page */
+ /* ok - we are on a partially or fully reaped page */
+ vp = *vpp;
if (vp->offsets_free == 0)
- { /* this is EmptyPage !!! */
- return vp;
+ {
+ /* this is EmptyPage, so claim all tuples on it are reaped!!! */
+ return true;
}
- voff = (OffsetNumber *) vac_find_eq((void *) (vp->offsets),
- vp->offsets_free, sizeof(OffsetNumber), (void *) &ioffno,
- vac_cmp_offno);
+ voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
+ (void *) (vp->offsets),
+ vp->offsets_free,
+ sizeof(OffsetNumber),
+ vac_cmp_offno);
- if (voff == (OffsetNumber *) NULL)
- return (VacPage) NULL;
+ if (voff == NULL)
+ return false;
- return vp;
+ /* tid is reaped */
+ return true;
+}
+/*
+ * Dummy version for scan_index.
+ */
+static bool
+dummy_tid_reaped(ItemPointer itemptr, void *state)
+{
+ return false;
}
/*
- * update_relstats() -- update statistics for one relation
- *
- * Update the whole-relation statistics that are kept in its pg_class
- * row. There are additional stats that will be updated if we are
- * doing VACUUM ANALYZE, but we always update these stats.
- *
- * This routine works for both index and heap relation entries in
- * pg_class. We violate no-overwrite semantics here by storing new
- * values for the statistics columns directly into the pg_class
- * tuple that's already on the page. The reason for this is that if
- * we updated these tuples in the usual way, vacuuming pg_class itself
- * wouldn't work very well --- by the time we got done with a vacuum
- * cycle, most of the tuples in pg_class would've been obsoleted.
- * Updating pg_class's own statistics would be especially tricky.
- * Of course, this only works for fixed-size never-null columns, but
- * these are.
+ * Update the shared Free Space Map with the info we now have about
+ * free space in the relation, discarding any old info the map may have.
*/
static void
-update_relstats(Oid relid, int num_pages, int num_tuples, bool hasindex,
- VRelStats *vacrelstats)
+vac_update_fsm(Relation onerel, VacPageList fraged_pages,
+ BlockNumber rel_pages)
{
- Relation rd;
- HeapTupleData rtup;
- HeapTuple ctup;
- Form_pg_class pgcform;
- Buffer buffer;
+ int nPages = fraged_pages->num_pages;
+ VacPage *pagedesc = fraged_pages->pagedesc;
+ Size threshold;
+ PageFreeSpaceInfo *pageSpaces;
+ int outPages;
+ int i;
/*
- * update number of tuples and number of pages in pg_class
+ * We only report pages with free space at least equal to the average
+ * request size --- this avoids cluttering FSM with uselessly-small
+ * bits of space. Although FSM would discard pages with little free
+ * space anyway, it's important to do this prefiltering because (a) it
+ * reduces the time spent holding the FSM lock in
+ * RecordRelationFreeSpace, and (b) FSM uses the number of pages
+ * reported as a statistic for guiding space management. If we didn't
+ * threshold our reports the same way vacuumlazy.c does, we'd be
+ * skewing that statistic.
*/
- rd = heap_openr(RelationRelationName, RowExclusiveLock);
+ threshold = GetAvgFSMRequestSize(&onerel->rd_node);
- ctup = SearchSysCache(RELOID,
- ObjectIdGetDatum(relid),
- 0, 0, 0);
- if (!HeapTupleIsValid(ctup))
- elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
- relid);
+ /* +1 to avoid palloc(0) */
+ pageSpaces = (PageFreeSpaceInfo *)
+ palloc((nPages + 1) * sizeof(PageFreeSpaceInfo));
+ outPages = 0;
- /* get the buffer cache tuple */
- rtup.t_self = ctup->t_self;
- ReleaseSysCache(ctup);
- heap_fetch(rd, SnapshotNow, &rtup, &buffer);
+ for (i = 0; i < nPages; i++)
+ {
+ /*
+ * fraged_pages may contain entries for pages that we later
+ * decided to truncate from the relation; don't enter them into
+ * the free space map!
+ */
+ if (pagedesc[i]->blkno >= rel_pages)
+ break;
- /* overwrite the existing statistics in the tuple */
- pgcform = (Form_pg_class) GETSTRUCT(&rtup);
- pgcform->reltuples = num_tuples;
- pgcform->relpages = num_pages;
- pgcform->relhasindex = hasindex;
+ if (pagedesc[i]->free >= threshold)
+ {
+ pageSpaces[outPages].blkno = pagedesc[i]->blkno;
+ pageSpaces[outPages].avail = pagedesc[i]->free;
+ outPages++;
+ }
+ }
- /* invalidate the tuple in the cache and write the buffer */
- RelationInvalidateHeapTuple(rd, &rtup);
- WriteBuffer(buffer);
+ RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
- heap_close(rd, RowExclusiveLock);
+ pfree(pageSpaces);
}
-/*
- * reap_page() -- save a page on the array of reaped pages.
- *
- * As a side effect of the way that the vacuuming loop for a given
- * relation works, higher pages come after lower pages in the array
- * (and highest tid on a page is last).
- */
-static void
-reap_page(VacPageList vacpagelist, VacPage vacpage)
+/* Copy a VacPage structure */
+static VacPage
+copy_vac_page(VacPage vacpage)
{
- VacPage newvacpage;
+ VacPage newvacpage;
/* allocate a VacPageData entry */
- newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber));
+ newvacpage = (VacPage) palloc(sizeof(VacPageData) +
+ vacpage->offsets_free * sizeof(OffsetNumber));
/* fill it in */
if (vacpage->offsets_free > 0)
- memmove(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber));
+ memcpy(newvacpage->offsets, vacpage->offsets,
+ vacpage->offsets_free * sizeof(OffsetNumber));
newvacpage->blkno = vacpage->blkno;
newvacpage->free = vacpage->free;
newvacpage->offsets_used = vacpage->offsets_used;
newvacpage->offsets_free = vacpage->offsets_free;
- /* insert this page into vacpagelist list */
- vpage_insert(vacpagelist, newvacpage);
-
+ return newvacpage;
}
+/*
+ * Add a VacPage pointer to a VacPageList.
+ *
+ * As a side effect of the way that scan_heap works,
+ * higher pages come after lower pages in the array
+ * (and highest tid on a page is last).
+ */
static void
vpage_insert(VacPageList vacpagelist, VacPage vpnew)
{
}
vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
(vacpagelist->num_pages)++;
-
}
+/*
+ * vac_bsearch: just like standard C library routine bsearch(),
+ * except that we first test to see whether the target key is outside
+ * the range of the table entries. This case is handled relatively slowly
+ * by the normal binary search algorithm (ie, no faster than any other key)
+ * but it occurs often enough in VACUUM to be worth optimizing.
+ */
static void *
-vac_find_eq(void *bot, int nelem, int size, void *elm,
- int (*compar) (const void *, const void *))
+vac_bsearch(const void *key, const void *base,
+ size_t nelem, size_t size,
+ int (*compar) (const void *, const void *))
{
int res;
- int last = nelem - 1;
- int celm = nelem / 2;
- bool last_move,
- first_move;
-
- last_move = first_move = true;
- for (;;)
+ const void *last;
+
+ if (nelem == 0)
+ return NULL;
+ res = compar(key, base);
+ if (res < 0)
+ return NULL;
+ if (res == 0)
+ return (void *) base;
+ if (nelem > 1)
{
- if (first_move == true)
- {
- res = compar(bot, elm);
- if (res > 0)
- return NULL;
- if (res == 0)
- return bot;
- first_move = false;
- }
- if (last_move == true)
- {
- res = compar(elm, (void *) ((char *) bot + last * size));
- if (res > 0)
- return NULL;
- if (res == 0)
- return (void *) ((char *) bot + last * size);
- last_move = false;
- }
- res = compar(elm, (void *) ((char *) bot + celm * size));
- if (res == 0)
- return (void *) ((char *) bot + celm * size);
- if (res < 0)
- {
- if (celm == 0)
- return NULL;
- last = celm - 1;
- celm = celm / 2;
- last_move = true;
- continue;
- }
-
- if (celm == last)
+ last = (const void *) ((const char *) base + (nelem - 1) * size);
+ res = compar(key, last);
+ if (res > 0)
return NULL;
-
- last = last - celm - 1;
- bot = (void *) ((char *) bot + (celm + 1) * size);
- celm = (last + 1) / 2;
- first_move = true;
+ if (res == 0)
+ return (void *) last;
}
-
+ if (nelem <= 2)
+ return NULL; /* already checked 'em all */
+ return bsearch(key, base, nelem, size, compar);
}
+/*
+ * Comparator routines for use with qsort() and bsearch().
+ */
static int
vac_cmp_blk(const void *left, const void *right)
{
if (lblk == rblk)
return 0;
return 1;
-
}
static int
vac_cmp_offno(const void *left, const void *right)
{
-
if (*(OffsetNumber *) left < *(OffsetNumber *) right)
return -1;
if (*(OffsetNumber *) left == *(OffsetNumber *) right)
return 0;
return 1;
-
}
static int
vac_cmp_vtlinks(const void *left, const void *right)
{
-
if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
return -1;
((VTupleLink) right)->new_tid.ip_posid)
return 1;
return 0;
-
}
-static void
-get_indices(Relation relation, int *nindices, Relation **Irel)
+void
+vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
{
List *indexoidlist,
*indexoidscan;
indexoidlist = RelationGetIndexList(relation);
- *nindices = length(indexoidlist);
+ *nindexes = length(indexoidlist);
- if (*nindices > 0)
- *Irel = (Relation *) palloc(*nindices * sizeof(Relation));
+ if (*nindexes > 0)
+ *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
else
*Irel = NULL;
i = 0;
foreach(indexoidscan, indexoidlist)
{
- Oid indexoid = lfirsti(indexoidscan);
+ Oid indexoid = lfirsto(indexoidscan);
(*Irel)[i] = index_open(indexoid);
i++;
}
-static void
-close_indices(int nindices, Relation *Irel)
+void
+vac_close_indexes(int nindexes, Relation *Irel)
{
-
if (Irel == (Relation *) NULL)
return;
- while (nindices--)
- index_close(Irel[nindices]);
+ while (nindexes--)
+ index_close(Irel[nindexes]);
pfree(Irel);
-
}
/*
- * Obtain IndexInfo data for each index on the rel
+ * Is an index partial (ie, could it contain fewer tuples than the heap?)
*/
-static IndexInfo **
-get_index_desc(Relation onerel, int nindices, Relation *Irel)
+bool
+vac_is_partial_index(Relation indrel)
{
- IndexInfo **indexInfo;
- int i;
- HeapTuple cachetuple;
-
- indexInfo = (IndexInfo **) palloc(nindices * sizeof(IndexInfo *));
+ /*
+ * If the index's AM doesn't support nulls, it's partial for our
+ * purposes
+ */
+ if (!indrel->rd_am->amindexnulls)
+ return true;
- for (i = 0; i < nindices; i++)
- {
- cachetuple = SearchSysCache(INDEXRELID,
- ObjectIdGetDatum(RelationGetRelid(Irel[i])),
- 0, 0, 0);
- if (!HeapTupleIsValid(cachetuple))
- elog(ERROR, "get_index_desc: index %u not found",
- RelationGetRelid(Irel[i]));
- indexInfo[i] = BuildIndexInfo(cachetuple);
- ReleaseSysCache(cachetuple);
- }
+ /* Otherwise, look to see if there's a partial-index predicate */
+ if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
+ return true;
- return indexInfo;
+ return false;
}
static bool
enough_space(VacPage vacpage, Size len)
{
-
len = MAXALIGN(len);
if (len > vacpage->free)
return false;
- if (vacpage->offsets_used < vacpage->offsets_free) /* there are free
- * itemid(s) */
- return true; /* and len <= free_space */
+ /* if there are free itemid(s) and len <= free_space... */
+ if (vacpage->offsets_used < vacpage->offsets_free)
+ return true;
- /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
- if (len + MAXALIGN(sizeof(ItemIdData)) <= vacpage->free)
+ /* noff_used >= noff_free and so we'll have to allocate new itemid */
+ if (len + sizeof(ItemIdData) <= vacpage->free)
return true;
return false;
-
}
+/*
+ * Initialize usage snapshot.
+ */
+void
+vac_init_rusage(VacRUsage *ru0)
+{
+ struct timezone tz;
+
+ getrusage(RUSAGE_SELF, &ru0->ru);
+ gettimeofday(&ru0->tv, &tz);
+}
+
/*
* Compute elapsed time since ru0 usage snapshot, and format into
* a displayable string. Result is in a static string, which is
* tacky, but no one ever claimed that the Postgres backend is
* threadable...
*/
-static char *
-show_rusage(struct rusage * ru0)
+const char *
+vac_show_rusage(VacRUsage *ru0)
{
- static char result[64];
- struct rusage ru1;
+ static char result[100];
+ VacRUsage ru1;
- getrusage(RUSAGE_SELF, &ru1);
+ vac_init_rusage(&ru1);
- if (ru1.ru_stime.tv_usec < ru0->ru_stime.tv_usec)
+ if (ru1.tv.tv_usec < ru0->tv.tv_usec)
+ {
+ ru1.tv.tv_sec--;
+ ru1.tv.tv_usec += 1000000;
+ }
+ if (ru1.ru.ru_stime.tv_usec < ru0->ru.ru_stime.tv_usec)
{
- ru1.ru_stime.tv_sec--;
- ru1.ru_stime.tv_usec += 1000000;
+ ru1.ru.ru_stime.tv_sec--;
+ ru1.ru.ru_stime.tv_usec += 1000000;
}
- if (ru1.ru_utime.tv_usec < ru0->ru_utime.tv_usec)
+ if (ru1.ru.ru_utime.tv_usec < ru0->ru.ru_utime.tv_usec)
{
- ru1.ru_utime.tv_sec--;
- ru1.ru_utime.tv_usec += 1000000;
+ ru1.ru.ru_utime.tv_sec--;
+ ru1.ru.ru_utime.tv_usec += 1000000;
}
snprintf(result, sizeof(result),
- "CPU %d.%02ds/%d.%02du sec.",
- (int) (ru1.ru_stime.tv_sec - ru0->ru_stime.tv_sec),
- (int) (ru1.ru_stime.tv_usec - ru0->ru_stime.tv_usec) / 10000,
- (int) (ru1.ru_utime.tv_sec - ru0->ru_utime.tv_sec),
- (int) (ru1.ru_utime.tv_usec - ru0->ru_utime.tv_usec) / 10000);
+ "CPU %d.%02ds/%d.%02du sec elapsed %d.%02d sec.",
+ (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec),
+ (int) (ru1.ru.ru_stime.tv_usec - ru0->ru.ru_stime.tv_usec) / 10000,
+ (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec),
+ (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000,
+ (int) (ru1.tv.tv_sec - ru0->tv.tv_sec),
+ (int) (ru1.tv.tv_usec - ru0->tv.tv_usec) / 10000);
return result;
}