#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
+#include "catalog/pg_operator.h"
#include "commands/cluster.h"
#include "commands/matview.h"
#include "commands/tablecmds.h"
+#include "commands/tablespace.h"
#include "executor/executor.h"
+#include "executor/spi.h"
#include "miscadmin.h"
+#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
+#include "utils/typcache.h"
typedef struct
BulkInsertState bistate; /* bulk insert state */
} DR_transientrel;
+static int matview_maintenance_depth = 0;
+
static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
static void transientrel_shutdown(DestReceiver *self);
static void transientrel_destroy(DestReceiver *self);
static void refresh_matview_datafill(DestReceiver *dest, Query *query,
- const char *queryString);
+ const char *queryString, Oid relowner);
+
+static char *make_temptable_name_n(char *tempname, int n);
+static void mv_GenerateOper(StringInfo buf, Oid opoid);
+
+static void refresh_by_match_merge(Oid matviewOid, Oid tempOid);
+static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap);
+
+static void OpenMatViewIncrementalMaintenance(void);
+static void CloseMatViewIncrementalMaintenance(void);
/*
* SetMatViewPopulatedState
RewriteRule *rule;
List *actions;
Query *dataQuery;
- Oid save_userid;
- int save_sec_context;
- int save_nestlevel;
Oid tableSpace;
Oid OIDNewHeap;
DestReceiver *dest;
+ bool concurrent;
+ LOCKMODE lockmode;
+
+ /* Determine strength of lock needed. */
+ concurrent = stmt->concurrent;
+ lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
/*
* Get a lock until end of transaction.
*/
matviewOid = RangeVarGetRelidExtended(stmt->relation,
- AccessExclusiveLock, false, false,
+ lockmode, false, false,
RangeVarCallbackOwnsTable, NULL);
matviewRel = heap_open(matviewOid, NoLock);
errmsg("\"%s\" is not a materialized view",
RelationGetRelationName(matviewRel))));
- /*
- * We're not using materialized views in the system catalogs.
- */
+ /* Check that CONCURRENTLY is not specified if not populated. */
+ if (concurrent && !RelationIsPopulated(matviewRel))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
+
+ /* Check that conflicting options have not been specified. */
+ if (concurrent && stmt->skipData)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
+
+ /* We're not using materialized views in the system catalogs. */
Assert(!IsSystemRelation(matviewRel));
+ /* We don't allow an oid column for a materialized view. */
Assert(!matviewRel->rd_rel->relhasoids);
/*
*/
CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
- /*
- * Switch to the owner's userid, so that any functions are run as that
- * user. Also lock down security-restricted operations and arrange to
- * make GUC variable changes local to this command.
- */
- GetUserIdAndSecContext(&save_userid, &save_sec_context);
- SetUserIdAndSecContext(matviewRel->rd_rel->relowner,
- save_sec_context | SECURITY_RESTRICTED_OPERATION);
- save_nestlevel = NewGUCNestLevel();
-
/*
* Tentatively mark the matview as populated or not (this will roll back
* if we fail later).
*/
SetMatViewPopulatedState(matviewRel, !stmt->skipData);
- tableSpace = matviewRel->rd_rel->reltablespace;
+ /* Concurrent refresh builds new data in temp tablespace, and does diff. */
+ if (concurrent)
+ tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP);
+ else
+ tableSpace = matviewRel->rd_rel->reltablespace;
heap_close(matviewRel, NoLock);
/* Create the transient table that will receive the regenerated data. */
- OIDNewHeap = make_new_heap(matviewOid, tableSpace);
+ OIDNewHeap = make_new_heap(matviewOid, tableSpace, concurrent,
+ ExclusiveLock);
dest = CreateTransientRelDestReceiver(OIDNewHeap);
/* Generate the data, if wanted. */
if (!stmt->skipData)
- refresh_matview_datafill(dest, dataQuery, queryString);
-
- /*
- * Swap the physical files of the target and transient tables, then
- * rebuild the target's indexes and throw away the transient table.
- */
- finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
- RecentXmin, ReadNextMultiXactId());
-
- RelationCacheInvalidateEntry(matviewOid);
-
- /* Roll back any GUC changes */
- AtEOXact_GUC(false, save_nestlevel);
-
- /* Restore userid and security context */
- SetUserIdAndSecContext(save_userid, save_sec_context);
+ refresh_matview_datafill(dest, dataQuery, queryString,
+ matviewRel->rd_rel->relowner);
+
+ /* Make the matview match the newly generated data. */
+ if (concurrent)
+ {
+ int old_depth = matview_maintenance_depth;
+
+ PG_TRY();
+ {
+ refresh_by_match_merge(matviewOid, OIDNewHeap);
+ }
+ PG_CATCH();
+ {
+ matview_maintenance_depth = old_depth;
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ Assert(matview_maintenance_depth == old_depth);
+ }
+ else
+ refresh_by_heap_swap(matviewOid, OIDNewHeap);
}
/*
*/
static void
refresh_matview_datafill(DestReceiver *dest, Query *query,
- const char *queryString)
+ const char *queryString, Oid relowner)
{
List *rewritten;
PlannedStmt *plan;
QueryDesc *queryDesc;
+ Oid save_userid;
+ int save_sec_context;
+ int save_nestlevel;
+
+ /*
+ * Switch to the owner's userid, so that any functions are run as that
+ * user. Also lock down security-restricted operations and arrange to
+ * make GUC variable changes local to this command.
+ */
+ GetUserIdAndSecContext(&save_userid, &save_sec_context);
+ SetUserIdAndSecContext(relowner,
+ save_sec_context | SECURITY_RESTRICTED_OPERATION);
+ save_nestlevel = NewGUCNestLevel();
/* Rewrite, copying the given Query to make sure it's not changed */
rewritten = QueryRewrite((Query *) copyObject(query));
FreeQueryDesc(queryDesc);
PopActiveSnapshot();
+
+ /* Roll back any GUC changes */
+ AtEOXact_GUC(false, save_nestlevel);
+
+ /* Restore userid and security context */
+ SetUserIdAndSecContext(save_userid, save_sec_context);
}
DestReceiver *
{
pfree(self);
}
+
+
+/*
+ * Given a qualified temporary table name, append an underscore followed by
+ * the given integer, to make a new table name based on the old one.
+ *
+ * This leaks memory through palloc(), which won't be cleaned up until the
+ * current memory memory context is freed.
+ */
+static char *
+make_temptable_name_n(char *tempname, int n)
+{
+ StringInfoData namebuf;
+
+ initStringInfo(&namebuf);
+ appendStringInfoString(&namebuf, tempname);
+ appendStringInfo(&namebuf, "_%i", n);
+ return namebuf.data;
+}
+
+static void
+mv_GenerateOper(StringInfo buf, Oid opoid)
+{
+ HeapTuple opertup;
+ Form_pg_operator operform;
+
+ opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid));
+ if (!HeapTupleIsValid(opertup))
+ elog(ERROR, "cache lookup failed for operator %u", opoid);
+ operform = (Form_pg_operator) GETSTRUCT(opertup);
+ Assert(operform->oprkind == 'b');
+
+ appendStringInfo(buf, "OPERATOR(%s.%s)",
+ quote_identifier(get_namespace_name(operform->oprnamespace)),
+ NameStr(operform->oprname));
+
+ ReleaseSysCache(opertup);
+}
+
+/*
+ * refresh_by_match_merge
+ *
+ * Refresh a materialized view with transactional semantics, while allowing
+ * concurrent reads.
+ *
+ * This is called after a new version of the data has been created in a
+ * temporary table. It performs a full outer join against the old version of
+ * the data, producing "diff" results. This join cannot work if there are any
+ * duplicated rows in either the old or new versions, in the sense that every
+ * column would compare as equal between the two rows. It does work correctly
+ * in the face of rows which have at least one NULL value, with all non-NULL
+ * columns equal. The behavior of NULLs on equality tests and on UNIQUE
+ * indexes turns out to be quite convenient here; the tests we need to make
+ * are consistent with default behavior. If there is at least one UNIQUE
+ * index on the materialized view, we have exactly the guarantee we need. By
+ * joining based on equality on all columns which are part of any unique
+ * index, we identify the rows on which we can use UPDATE without any problem.
+ * If any column is NULL in either the old or new version of a row (or both),
+ * we must use DELETE and INSERT, since there could be multiple rows which are
+ * NOT DISTINCT FROM each other, and we could otherwise end up with the wrong
+ * number of occurrences in the updated relation. The temporary table used to
+ * hold the diff results contains just the TID of the old record (if matched)
+ * and the ROW from the new table as a single column of complex record type
+ * (if matched).
+ *
+ * Once we have the diff table, we perform set-based DELETE, UPDATE, and
+ * INSERT operations against the materialized view, and discard both temporary
+ * tables.
+ *
+ * Everything from the generation of the new data to applying the differences
+ * takes place under cover of an ExclusiveLock, since it seems as though we
+ * would want to prohibit not only concurrent REFRESH operations, but also
+ * incremental maintenance. It also doesn't seem reasonable or safe to allow
+ * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
+ * this command.
+ */
+static void
+refresh_by_match_merge(Oid matviewOid, Oid tempOid)
+{
+ StringInfoData querybuf;
+ Relation matviewRel;
+ Relation tempRel;
+ char *matviewname;
+ char *tempname;
+ char *diffname;
+ TupleDesc tupdesc;
+ bool foundUniqueIndex;
+ List *indexoidlist;
+ ListCell *indexoidscan;
+ int16 relnatts;
+ bool *usedForQual;
+ Oid save_userid;
+ int save_sec_context;
+ int save_nestlevel;
+
+ initStringInfo(&querybuf);
+ matviewRel = heap_open(matviewOid, NoLock);
+ matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+ RelationGetRelationName(matviewRel));
+ tempRel = heap_open(tempOid, NoLock);
+ tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
+ RelationGetRelationName(tempRel));
+ diffname = make_temptable_name_n(tempname, 2);
+
+ relnatts = matviewRel->rd_rel->relnatts;
+ usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
+
+ /* Open SPI context. */
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+
+ /* Analyze the temp table with the new contents. */
+ appendStringInfo(&querybuf, "ANALYZE %s", tempname);
+ if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ /*
+ * We need to ensure that there are not duplicate rows without NULLs in
+ * the new data set before we can count on the "diff" results. Check for
+ * that in a way that allows showing the first duplicated row found. Even
+ * after we pass this test, a unique index on the materialized view may
+ * find a duplicate key problem.
+ */
+ resetStringInfo(&querybuf);
+ appendStringInfo(&querybuf,
+ "SELECT x FROM %s x WHERE x IS NOT NULL AND EXISTS "
+ "(SELECT * FROM %s y WHERE y IS NOT NULL "
+ "AND (y.*) = (x.*) AND y.ctid <> x.ctid) LIMIT 1",
+ tempname, tempname);
+ if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+ if (SPI_processed > 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_CARDINALITY_VIOLATION),
+ errmsg("new data for \"%s\" contains duplicate rows without any NULL columns",
+ RelationGetRelationName(matviewRel)),
+ errdetail("Row: %s",
+ SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
+ }
+
+ /* Start building the query for creating the diff table. */
+ resetStringInfo(&querybuf);
+ appendStringInfo(&querybuf,
+ "CREATE TEMP TABLE %s AS "
+ "SELECT x.ctid AS tid, y FROM %s x FULL JOIN %s y ON (",
+ diffname, matviewname, tempname);
+
+ /*
+ * Get the list of index OIDs for the table from the relcache, and look up
+ * each one in the pg_index syscache. We will test for equality on all
+ * columns present in all unique indexes which only reference columns and
+ * include all rows.
+ */
+ tupdesc = matviewRel->rd_att;
+ foundUniqueIndex = false;
+ indexoidlist = RelationGetIndexList(matviewRel);
+
+ foreach(indexoidscan, indexoidlist)
+ {
+ Oid indexoid = lfirst_oid(indexoidscan);
+ HeapTuple indexTuple;
+ Form_pg_index index;
+
+ indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+ if (!HeapTupleIsValid(indexTuple)) /* should not happen */
+ elog(ERROR, "cache lookup failed for index %u", indexoid);
+ index = (Form_pg_index) GETSTRUCT(indexTuple);
+
+ /* We're only interested if it is unique and valid. */
+ if (index->indisunique && IndexIsValid(index))
+ {
+ int numatts = index->indnatts;
+ int i;
+ bool expr = false;
+ Relation indexRel;
+
+ /* Skip any index on an expression. */
+ for (i = 0; i < numatts; i++)
+ {
+ if (index->indkey.values[i] == 0)
+ {
+ expr = true;
+ break;
+ }
+ }
+ if (expr)
+ {
+ ReleaseSysCache(indexTuple);
+ continue;
+ }
+
+ /* Skip partial indexes. */
+ indexRel = index_open(index->indexrelid, RowExclusiveLock);
+ if (indexRel->rd_indpred != NIL)
+ {
+ index_close(indexRel, NoLock);
+ ReleaseSysCache(indexTuple);
+ continue;
+ }
+ /* Hold the locks, since we're about to run DML which needs them. */
+ index_close(indexRel, NoLock);
+
+ /* Add quals for all columns from this index. */
+ for (i = 0; i < numatts; i++)
+ {
+ int attnum = index->indkey.values[i];
+ Oid type;
+ Oid op;
+ const char *colname;
+
+ /*
+ * Only include the column once regardless of how many times
+ * it shows up in how many indexes.
+ *
+ * This is also useful later to omit columns which can not
+ * have changed from the SET clause of the UPDATE statement.
+ */
+ if (usedForQual[attnum - 1])
+ continue;
+ usedForQual[attnum - 1] = true;
+
+ /*
+ * Actually add the qual, ANDed with any others.
+ */
+ if (foundUniqueIndex)
+ appendStringInfoString(&querybuf, " AND ");
+
+ colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname));
+ appendStringInfo(&querybuf, "y.%s ", colname);
+ type = attnumTypeId(matviewRel, attnum);
+ op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr;
+ mv_GenerateOper(&querybuf, op);
+ appendStringInfo(&querybuf, " x.%s", colname);
+
+ foundUniqueIndex = true;
+ }
+ }
+ ReleaseSysCache(indexTuple);
+ }
+
+ list_free(indexoidlist);
+
+ if (!foundUniqueIndex)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot refresh materialized view \"%s\" concurrently",
+ matviewname),
+ errhint("Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view.")));
+
+ appendStringInfoString(&querybuf,
+ " AND y = x) WHERE (y.*) IS DISTINCT FROM (x.*)"
+ " ORDER BY tid");
+
+ /* Create the temporary "diff" table. */
+ if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ /*
+ * We have no further use for data from the "full-data" temp table, but we
+ * must keep it around because its type is reference from the diff table.
+ */
+
+ /* Analyze the diff table. */
+ resetStringInfo(&querybuf);
+ appendStringInfo(&querybuf, "ANALYZE %s", diffname);
+ if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ OpenMatViewIncrementalMaintenance();
+
+ /*
+ * Switch to the owner's userid, so that any functions are run as that
+ * user. Also lock down security-restricted operations and arrange to
+ * make GUC variable changes local to this command.
+ */
+ GetUserIdAndSecContext(&save_userid, &save_sec_context);
+ SetUserIdAndSecContext(matviewRel->rd_rel->relowner,
+ save_sec_context | SECURITY_RESTRICTED_OPERATION);
+ save_nestlevel = NewGUCNestLevel();
+
+ /* Deletes must come before inserts; do them first. */
+ resetStringInfo(&querybuf);
+ appendStringInfo(&querybuf,
+ "DELETE FROM %s WHERE ctid IN "
+ "(SELECT d.tid FROM %s d "
+ "WHERE d.tid IS NOT NULL "
+ "AND (d.y) IS NOT DISTINCT FROM NULL)",
+ matviewname, diffname);
+ if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ /* Updates before inserts gives a better chance at HOT updates. */
+ resetStringInfo(&querybuf);
+ appendStringInfo(&querybuf, "UPDATE %s x SET ", matviewname);
+
+ {
+ int i;
+ bool targetColFound = false;
+
+ for (i = 0; i < tupdesc->natts; i++)
+ {
+ const char *colname;
+
+ if (tupdesc->attrs[i]->attisdropped)
+ continue;
+
+ if (usedForQual[i])
+ continue;
+
+ if (targetColFound)
+ appendStringInfoString(&querybuf, ", ");
+ targetColFound = true;
+
+ colname = quote_identifier(NameStr((tupdesc->attrs[i])->attname));
+ appendStringInfo(&querybuf, "%s = (d.y).%s", colname, colname);
+ }
+
+ if (targetColFound)
+ {
+ appendStringInfo(&querybuf,
+ " FROM %s d "
+ "WHERE d.tid IS NOT NULL AND x.ctid = d.tid",
+ diffname);
+
+ if (SPI_exec(querybuf.data, 0) != SPI_OK_UPDATE)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+ }
+ }
+
+ /* Inserts go last. */
+ resetStringInfo(&querybuf);
+ appendStringInfo(&querybuf,
+ "INSERT INTO %s SELECT (y).* FROM %s WHERE tid IS NULL",
+ matviewname, diffname);
+ if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ /* Roll back any GUC changes */
+ AtEOXact_GUC(false, save_nestlevel);
+
+ /* Restore userid and security context */
+ SetUserIdAndSecContext(save_userid, save_sec_context);
+
+ /* We're done maintaining the materialized view. */
+ CloseMatViewIncrementalMaintenance();
+ heap_close(tempRel, NoLock);
+ heap_close(matviewRel, NoLock);
+
+ /* Clean up temp tables. */
+ resetStringInfo(&querybuf);
+ appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
+ if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
+ elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+ /* Close SPI context. */
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
+}
+
+/*
+ * Swap the physical files of the target and transient tables, then rebuild
+ * the target's indexes and throw away the transient table. Security context
+ * swapping is handled by the called function, so it is not needed here.
+ */
+static void
+refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap)
+{
+ finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
+ RecentXmin, ReadNextMultiXactId());
+
+ RelationCacheInvalidateEntry(matviewOid);
+}
+
+static void
+OpenMatViewIncrementalMaintenance(void)
+{
+ matview_maintenance_depth++;
+}
+
+static void
+CloseMatViewIncrementalMaintenance(void)
+{
+ matview_maintenance_depth--;
+ Assert(matview_maintenance_depth >= 0);
+}
+
+/*
+ * This should be used to test whether the backend is in a context where it is
+ * OK to allow DML statements to modify materialized views. We only want to
+ * allow that for internal code driven by the materialized view definition,
+ * not for arbitrary user-supplied code.
+ */
+bool
+MatViewIncrementalMaintenanceIsEnabled(void)
+{
+ return matview_maintenance_depth > 0;
+}