*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.116 2008/03/26 21:10:37 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.117 2008/04/03 16:27:25 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "access/heapam.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
+#include "access/xact.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "utils/acl.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
* zero-column table.
*/
if (!vacstmt->vacuum)
- pgstat_report_analyze(RelationGetRelid(onerel),
- onerel->rd_rel->relisshared,
- 0, 0);
-
+ pgstat_report_analyze(onerel, 0, 0);
goto cleanup;
}
}
/* report results to the stats collector, too */
- pgstat_report_analyze(RelationGetRelid(onerel),
- onerel->rd_rel->relisshared,
- totalrows, totaldeadrows);
+ pgstat_report_analyze(onerel, totalrows, totaldeadrows);
}
/* We skip to here if there were no analyzable columns */
acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
double *totalrows, double *totaldeadrows)
{
- int numrows = 0; /* # rows collected */
- double liverows = 0; /* # rows seen */
+ int numrows = 0; /* # rows now in reservoir */
+ double samplerows = 0; /* total # rows collected */
+ double liverows = 0; /* # live rows seen */
double deadrows = 0; /* # dead rows seen */
double rowstoskip = -1; /* -1 means not set yet */
BlockNumber totalblocks;
+ TransactionId OldestXmin;
BlockSamplerData bs;
double rstate;
totalblocks = RelationGetNumberOfBlocks(onerel);
+ /* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
+ OldestXmin = GetOldestXmin(onerel->rd_rel->relisshared, true);
+
/* Prepare for sampling block numbers */
BlockSampler_Init(&bs, totalblocks, targrows);
/* Prepare for sampling rows */
* We must maintain a pin on the target page's buffer to ensure that
* the maxoffset value stays good (else concurrent VACUUM might delete
* tuples out from under us). Hence, pin the page until we are done
- * looking at it. We don't maintain a lock on the page, so tuples
- * could get added to it, but we ignore such tuples.
+ * looking at it. We also choose to hold sharelock on the buffer
+ * throughout --- we could release and re-acquire sharelock for
+ * each tuple, but since we aren't doing much work per tuple, the
+ * extra lock traffic is probably better avoided.
*/
targbuffer = ReadBufferWithStrategy(onerel, targblock, vac_strategy);
LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
targpage = BufferGetPage(targbuffer);
maxoffset = PageGetMaxOffsetNumber(targpage);
- LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK);
/* Inner loop over all tuples on the selected page */
for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
{
+ ItemId itemid;
HeapTupleData targtuple;
+ bool sample_it = false;
+
+ itemid = PageGetItemId(targpage, targoffset);
+
+ /*
+ * We ignore unused and redirect line pointers. DEAD line
+ * pointers should be counted as dead, because we need vacuum
+ * to run to get rid of them. Note that this rule agrees with
+ * the way that heap_page_prune() counts things.
+ */
+ if (!ItemIdIsNormal(itemid))
+ {
+ if (ItemIdIsDead(itemid))
+ deadrows += 1;
+ continue;
+ }
ItemPointerSet(&targtuple.t_self, targblock, targoffset);
- /* We use heap_release_fetch to avoid useless bufmgr traffic */
- if (heap_release_fetch(onerel, SnapshotNow,
- &targtuple, &targbuffer,
- true, NULL))
+
+ targtuple.t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
+ targtuple.t_len = ItemIdGetLength(itemid);
+
+ switch (HeapTupleSatisfiesVacuum(targtuple.t_data,
+ OldestXmin,
+ targbuffer))
+ {
+ case HEAPTUPLE_LIVE:
+ sample_it = true;
+ liverows += 1;
+ break;
+
+ case HEAPTUPLE_DEAD:
+ case HEAPTUPLE_RECENTLY_DEAD:
+ /* Count dead and recently-dead rows */
+ deadrows += 1;
+ break;
+
+ case HEAPTUPLE_INSERT_IN_PROGRESS:
+ /*
+ * Insert-in-progress rows are not counted. We assume
+ * that when the inserting transaction commits or aborts,
+ * it will send a stats message to increment the proper
+ * count. This works right only if that transaction ends
+ * after we finish analyzing the table; if things happen
+ * in the other order, its stats update will be
+ * overwritten by ours. However, the error will be
+ * large only if the other transaction runs long enough
+ * to insert many tuples, so assuming it will finish
+ * after us is the safer option.
+ *
+ * A special case is that the inserting transaction might
+ * be our own. In this case we should count and sample
+ * the row, to accommodate users who load a table and
+ * analyze it in one transaction. (pgstat_report_analyze
+ * has to adjust the numbers we send to the stats collector
+ * to make this come out right.)
+ */
+ if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple.t_data)))
+ {
+ sample_it = true;
+ liverows += 1;
+ }
+ break;
+
+ case HEAPTUPLE_DELETE_IN_PROGRESS:
+ /*
+ * We count delete-in-progress rows as still live, using
+ * the same reasoning given above; but we don't bother to
+ * include them in the sample.
+ *
+ * If the delete was done by our own transaction, however,
+ * we must count the row as dead to make
+ * pgstat_report_analyze's stats adjustments come out
+ * right. (Note: this works out properly when the row
+ * was both inserted and deleted in our xact.)
+ */
+ if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(targtuple.t_data)))
+ deadrows += 1;
+ else
+ liverows += 1;
+ break;
+
+ default:
+ elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+ break;
+ }
+
+ if (sample_it)
{
/*
- * The first targrows live rows are simply copied into the
+ * The first targrows sample rows are simply copied into the
* reservoir. Then we start replacing tuples in the sample
* until we reach the end of the relation. This algorithm is
* from Jeff Vitter's paper (see full citation below). It
/*
* t in Vitter's paper is the number of records already
* processed. If we need to compute a new S value, we
- * must use the not-yet-incremented value of liverows as
- * t.
+ * must use the not-yet-incremented value of samplerows
+ * as t.
*/
if (rowstoskip < 0)
- rowstoskip = get_next_S(liverows, targrows, &rstate);
+ rowstoskip = get_next_S(samplerows, targrows, &rstate);
if (rowstoskip <= 0)
{
rowstoskip -= 1;
}
- liverows += 1;
- }
- else
- {
- /* Count dead rows, but not empty slots */
- if (targtuple.t_data != NULL)
- deadrows += 1;
+ samplerows += 1;
}
}
- /* Now release the pin on the page */
- ReleaseBuffer(targbuffer);
+ /* Now release the lock and pin on the page */
+ UnlockReleaseBuffer(targbuffer);
}
/*
*
* Copyright (c) 2001-2008, PostgreSQL Global Development Group
*
- * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.172 2008/03/26 21:10:38 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.173 2008/04/03 16:27:25 tgl Exp $
* ----------
*/
#include "postgres.h"
* --------
*/
void
-pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
+pgstat_report_analyze(Relation rel, PgStat_Counter livetuples,
PgStat_Counter deadtuples)
{
PgStat_MsgAnalyze msg;
if (pgStatSock < 0 || !pgstat_track_counts)
return;
+ /*
+ * Unlike VACUUM, ANALYZE might be running inside a transaction that
+ * has already inserted and/or deleted rows in the target table.
+ * ANALYZE will have counted such rows as live or dead respectively.
+ * Because we will report our counts of such rows at transaction end,
+ * we should subtract off these counts from what we send to the collector
+ * now, else they'll be double-counted after commit. (This approach also
+ * ensures that the collector ends up with the right numbers if we abort
+ * instead of committing.)
+ */
+ if (rel->pgstat_info != NULL)
+ {
+ PgStat_TableXactStatus *trans;
+
+ for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
+ {
+ livetuples -= trans->tuples_inserted - trans->tuples_deleted;
+ deadtuples -= trans->tuples_deleted;
+ }
+ /* count stuff inserted by already-aborted subxacts, too */
+ deadtuples -= rel->pgstat_info->t_counts.t_new_dead_tuples;
+ /* Since ANALYZE's counts are estimates, we could have underflowed */
+ livetuples = Max(livetuples, 0);
+ deadtuples = Max(deadtuples, 0);
+ }
+
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
- msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
- msg.m_tableoid = tableoid;
- msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
+ msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
+ msg.m_tableoid = RelationGetRelid(rel);
+ msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
msg.m_analyzetime = GetCurrentTimestamp();
msg.m_live_tuples = livetuples;
msg.m_dead_tuples = deadtuples;