From cc1965a99bf87005f431804bbda0f723887a04d6 Mon Sep 17 00:00:00 2001 From: Kevin Grittner Date: Tue, 16 Jul 2013 12:55:44 -0500 Subject: [PATCH] Add support for REFRESH MATERIALIZED VIEW CONCURRENTLY. This allows reads to continue without any blocking while a REFRESH runs. The new data appears atomically as part of transaction commit. Review questioned the Assert that a matview was not a system relation. This will be addressed separately. Reviewed by Hitoshi Harada, Robert Haas, Andres Freund. Merged after review with security patch f3ab5d4. --- doc/src/sgml/mvcc.sgml | 3 +- .../sgml/ref/refresh_materialized_view.sgml | 34 +- src/backend/commands/cluster.c | 27 +- src/backend/commands/matview.c | 524 ++++++++++++++++-- src/backend/commands/tablecmds.c | 3 +- src/backend/executor/execMain.c | 10 +- src/backend/executor/nodeModifyTable.c | 5 +- src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/equalfuncs.c | 1 + src/backend/parser/gram.y | 7 +- src/bin/psql/tab-complete.c | 17 + src/include/commands/cluster.h | 3 +- src/include/commands/matview.h | 2 + src/include/nodes/parsenodes.h | 1 + src/test/regress/expected/matview.out | 38 +- src/test/regress/sql/matview.sql | 29 +- 16 files changed, 646 insertions(+), 59 deletions(-) diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml index 316add70b7..cefd3235a6 100644 --- a/doc/src/sgml/mvcc.sgml +++ b/doc/src/sgml/mvcc.sgml @@ -928,8 +928,7 @@ ERROR: could not serialize access due to read/write dependencies among transact - This lock mode is not automatically acquired on tables by any - PostgreSQL command. + Acquired by REFRESH MATERIALIZED VIEW CONCURRENTLY. diff --git a/doc/src/sgml/ref/refresh_materialized_view.sgml b/doc/src/sgml/ref/refresh_materialized_view.sgml index 8f59bbf123..d2f8104aa7 100644 --- a/doc/src/sgml/ref/refresh_materialized_view.sgml +++ b/doc/src/sgml/ref/refresh_materialized_view.sgml @@ -21,7 +21,7 @@ PostgreSQL documentation -REFRESH MATERIALIZED VIEW name +REFRESH MATERIALIZED VIEW [ CONCURRENTLY ] name [ WITH [ NO ] DATA ] @@ -38,12 +38,44 @@ REFRESH MATERIALIZED VIEW name data is generated and the materialized view is left in an unscannable state. + + CONCURRENTLY and WITH NO DATA may not + be specified together. + Parameters + + CONCURRENTLY + + + Refresh the materialized view without locking out concurrent selects on + the materialized view. Without this option a refresh which affects a + lot of rows will tend to use fewer resources and complete more quickly, + but could block other connections which are trying to read from the + materialized view. This option may be faster in cases where a small + number of rows are affected. + + + This option is only allowed if there is at least one + UNIQUE index on the materialized view which uses only + column names and includes all rows; that is, it must not index on any + expressions nor include a WHERE clause. + + + This option may not be used when the materialized view is not already + populated. + + + Even with this option only one REFRESH at a time may + run against any one materialized view. + + + + name diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 686770f881..c20a6fb073 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -589,7 +589,8 @@ rebuild_relation(Relation OldHeap, Oid indexOid, heap_close(OldHeap, NoLock); /* Create the transient table that will receive the re-ordered data */ - OIDNewHeap = make_new_heap(tableOid, tableSpace); + OIDNewHeap = make_new_heap(tableOid, tableSpace, false, + AccessExclusiveLock); /* Copy the heap data into the new table in the desired order */ copy_heap_data(OIDNewHeap, tableOid, indexOid, @@ -616,7 +617,8 @@ rebuild_relation(Relation OldHeap, Oid indexOid, * data, then call finish_heap_swap to complete the operation. */ Oid -make_new_heap(Oid OIDOldHeap, Oid NewTableSpace) +make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, bool forcetemp, + LOCKMODE lockmode) { TupleDesc OldHeapDesc; char NewHeapName[NAMEDATALEN]; @@ -626,8 +628,10 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace) HeapTuple tuple; Datum reloptions; bool isNull; + Oid namespaceid; + char relpersistence; - OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock); + OldHeap = heap_open(OIDOldHeap, lockmode); OldHeapDesc = RelationGetDescr(OldHeap); /* @@ -648,6 +652,17 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace) if (isNull) reloptions = (Datum) 0; + if (forcetemp) + { + namespaceid = LookupCreationNamespace("pg_temp"); + relpersistence = RELPERSISTENCE_TEMP; + } + else + { + namespaceid = RelationGetNamespace(OldHeap); + relpersistence = OldHeap->rd_rel->relpersistence; + } + /* * Create the new heap, using a temporary name in the same namespace as * the existing table. NOTE: there is some risk of collision with user @@ -663,7 +678,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace) snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap); OIDNewHeap = heap_create_with_catalog(NewHeapName, - RelationGetNamespace(OldHeap), + namespaceid, NewTableSpace, InvalidOid, InvalidOid, @@ -671,8 +686,8 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace) OldHeap->rd_rel->relowner, OldHeapDesc, NIL, - OldHeap->rd_rel->relkind, - OldHeap->rd_rel->relpersistence, + RELKIND_RELATION, + relpersistence, false, RelationIsMapped(OldHeap), true, diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 1c383baf68..edd34ff171 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -20,17 +20,24 @@ #include "catalog/catalog.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/pg_operator.h" #include "commands/cluster.h" #include "commands/matview.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "executor/executor.h" +#include "executor/spi.h" #include "miscadmin.h" +#include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#include "utils/typcache.h" typedef struct @@ -44,12 +51,23 @@ typedef struct BulkInsertState bistate; /* bulk insert state */ } DR_transientrel; +static int matview_maintenance_depth = 0; + static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); static void transientrel_destroy(DestReceiver *self); static void refresh_matview_datafill(DestReceiver *dest, Query *query, - const char *queryString); + const char *queryString, Oid relowner); + +static char *make_temptable_name_n(char *tempname, int n); +static void mv_GenerateOper(StringInfo buf, Oid opoid); + +static void refresh_by_match_merge(Oid matviewOid, Oid tempOid); +static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap); + +static void OpenMatViewIncrementalMaintenance(void); +static void CloseMatViewIncrementalMaintenance(void); /* * SetMatViewPopulatedState @@ -122,18 +140,21 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, RewriteRule *rule; List *actions; Query *dataQuery; - Oid save_userid; - int save_sec_context; - int save_nestlevel; Oid tableSpace; Oid OIDNewHeap; DestReceiver *dest; + bool concurrent; + LOCKMODE lockmode; + + /* Determine strength of lock needed. */ + concurrent = stmt->concurrent; + lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock; /* * Get a lock until end of transaction. */ matviewOid = RangeVarGetRelidExtended(stmt->relation, - AccessExclusiveLock, false, false, + lockmode, false, false, RangeVarCallbackOwnsTable, NULL); matviewRel = heap_open(matviewOid, NoLock); @@ -144,11 +165,22 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, errmsg("\"%s\" is not a materialized view", RelationGetRelationName(matviewRel)))); - /* - * We're not using materialized views in the system catalogs. - */ + /* Check that CONCURRENTLY is not specified if not populated. */ + if (concurrent && !RelationIsPopulated(matviewRel)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CONCURRENTLY cannot be used when the materialized view is not populated"))); + + /* Check that conflicting options have not been specified. */ + if (concurrent && stmt->skipData) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together"))); + + /* We're not using materialized views in the system catalogs. */ Assert(!IsSystemRelation(matviewRel)); + /* We don't allow an oid column for a materialized view. */ Assert(!matviewRel->rd_rel->relhasoids); /* @@ -194,48 +226,49 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, */ CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW"); - /* - * Switch to the owner's userid, so that any functions are run as that - * user. Also lock down security-restricted operations and arrange to - * make GUC variable changes local to this command. - */ - GetUserIdAndSecContext(&save_userid, &save_sec_context); - SetUserIdAndSecContext(matviewRel->rd_rel->relowner, - save_sec_context | SECURITY_RESTRICTED_OPERATION); - save_nestlevel = NewGUCNestLevel(); - /* * Tentatively mark the matview as populated or not (this will roll back * if we fail later). */ SetMatViewPopulatedState(matviewRel, !stmt->skipData); - tableSpace = matviewRel->rd_rel->reltablespace; + /* Concurrent refresh builds new data in temp tablespace, and does diff. */ + if (concurrent) + tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP); + else + tableSpace = matviewRel->rd_rel->reltablespace; heap_close(matviewRel, NoLock); /* Create the transient table that will receive the regenerated data. */ - OIDNewHeap = make_new_heap(matviewOid, tableSpace); + OIDNewHeap = make_new_heap(matviewOid, tableSpace, concurrent, + ExclusiveLock); dest = CreateTransientRelDestReceiver(OIDNewHeap); /* Generate the data, if wanted. */ if (!stmt->skipData) - refresh_matview_datafill(dest, dataQuery, queryString); - - /* - * Swap the physical files of the target and transient tables, then - * rebuild the target's indexes and throw away the transient table. - */ - finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, - RecentXmin, ReadNextMultiXactId()); - - RelationCacheInvalidateEntry(matviewOid); - - /* Roll back any GUC changes */ - AtEOXact_GUC(false, save_nestlevel); - - /* Restore userid and security context */ - SetUserIdAndSecContext(save_userid, save_sec_context); + refresh_matview_datafill(dest, dataQuery, queryString, + matviewRel->rd_rel->relowner); + + /* Make the matview match the newly generated data. */ + if (concurrent) + { + int old_depth = matview_maintenance_depth; + + PG_TRY(); + { + refresh_by_match_merge(matviewOid, OIDNewHeap); + } + PG_CATCH(); + { + matview_maintenance_depth = old_depth; + PG_RE_THROW(); + } + PG_END_TRY(); + Assert(matview_maintenance_depth == old_depth); + } + else + refresh_by_heap_swap(matviewOid, OIDNewHeap); } /* @@ -243,11 +276,24 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, */ static void refresh_matview_datafill(DestReceiver *dest, Query *query, - const char *queryString) + const char *queryString, Oid relowner) { List *rewritten; PlannedStmt *plan; QueryDesc *queryDesc; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also lock down security-restricted operations and arrange to + * make GUC variable changes local to this command. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); /* Rewrite, copying the given Query to make sure it's not changed */ rewritten = QueryRewrite((Query *) copyObject(query)); @@ -290,6 +336,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, FreeQueryDesc(queryDesc); PopActiveSnapshot(); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); } DestReceiver * @@ -388,3 +440,401 @@ transientrel_destroy(DestReceiver *self) { pfree(self); } + + +/* + * Given a qualified temporary table name, append an underscore followed by + * the given integer, to make a new table name based on the old one. + * + * This leaks memory through palloc(), which won't be cleaned up until the + * current memory memory context is freed. + */ +static char * +make_temptable_name_n(char *tempname, int n) +{ + StringInfoData namebuf; + + initStringInfo(&namebuf); + appendStringInfoString(&namebuf, tempname); + appendStringInfo(&namebuf, "_%i", n); + return namebuf.data; +} + +static void +mv_GenerateOper(StringInfo buf, Oid opoid) +{ + HeapTuple opertup; + Form_pg_operator operform; + + opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid)); + if (!HeapTupleIsValid(opertup)) + elog(ERROR, "cache lookup failed for operator %u", opoid); + operform = (Form_pg_operator) GETSTRUCT(opertup); + Assert(operform->oprkind == 'b'); + + appendStringInfo(buf, "OPERATOR(%s.%s)", + quote_identifier(get_namespace_name(operform->oprnamespace)), + NameStr(operform->oprname)); + + ReleaseSysCache(opertup); +} + +/* + * refresh_by_match_merge + * + * Refresh a materialized view with transactional semantics, while allowing + * concurrent reads. + * + * This is called after a new version of the data has been created in a + * temporary table. It performs a full outer join against the old version of + * the data, producing "diff" results. This join cannot work if there are any + * duplicated rows in either the old or new versions, in the sense that every + * column would compare as equal between the two rows. It does work correctly + * in the face of rows which have at least one NULL value, with all non-NULL + * columns equal. The behavior of NULLs on equality tests and on UNIQUE + * indexes turns out to be quite convenient here; the tests we need to make + * are consistent with default behavior. If there is at least one UNIQUE + * index on the materialized view, we have exactly the guarantee we need. By + * joining based on equality on all columns which are part of any unique + * index, we identify the rows on which we can use UPDATE without any problem. + * If any column is NULL in either the old or new version of a row (or both), + * we must use DELETE and INSERT, since there could be multiple rows which are + * NOT DISTINCT FROM each other, and we could otherwise end up with the wrong + * number of occurrences in the updated relation. The temporary table used to + * hold the diff results contains just the TID of the old record (if matched) + * and the ROW from the new table as a single column of complex record type + * (if matched). + * + * Once we have the diff table, we perform set-based DELETE, UPDATE, and + * INSERT operations against the materialized view, and discard both temporary + * tables. + * + * Everything from the generation of the new data to applying the differences + * takes place under cover of an ExclusiveLock, since it seems as though we + * would want to prohibit not only concurrent REFRESH operations, but also + * incremental maintenance. It also doesn't seem reasonable or safe to allow + * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by + * this command. + */ +static void +refresh_by_match_merge(Oid matviewOid, Oid tempOid) +{ + StringInfoData querybuf; + Relation matviewRel; + Relation tempRel; + char *matviewname; + char *tempname; + char *diffname; + TupleDesc tupdesc; + bool foundUniqueIndex; + List *indexoidlist; + ListCell *indexoidscan; + int16 relnatts; + bool *usedForQual; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + + initStringInfo(&querybuf); + matviewRel = heap_open(matviewOid, NoLock); + matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), + RelationGetRelationName(matviewRel)); + tempRel = heap_open(tempOid, NoLock); + tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)), + RelationGetRelationName(tempRel)); + diffname = make_temptable_name_n(tempname, 2); + + relnatts = matviewRel->rd_rel->relnatts; + usedForQual = (bool *) palloc0(sizeof(bool) * relnatts); + + /* Open SPI context. */ + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* Analyze the temp table with the new contents. */ + appendStringInfo(&querybuf, "ANALYZE %s", tempname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* + * We need to ensure that there are not duplicate rows without NULLs in + * the new data set before we can count on the "diff" results. Check for + * that in a way that allows showing the first duplicated row found. Even + * after we pass this test, a unique index on the materialized view may + * find a duplicate key problem. + */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "SELECT x FROM %s x WHERE x IS NOT NULL AND EXISTS " + "(SELECT * FROM %s y WHERE y IS NOT NULL " + "AND (y.*) = (x.*) AND y.ctid <> x.ctid) LIMIT 1", + tempname, tempname); + if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + if (SPI_processed > 0) + { + ereport(ERROR, + (errcode(ERRCODE_CARDINALITY_VIOLATION), + errmsg("new data for \"%s\" contains duplicate rows without any NULL columns", + RelationGetRelationName(matviewRel)), + errdetail("Row: %s", + SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1)))); + } + + /* Start building the query for creating the diff table. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "CREATE TEMP TABLE %s AS " + "SELECT x.ctid AS tid, y FROM %s x FULL JOIN %s y ON (", + diffname, matviewname, tempname); + + /* + * Get the list of index OIDs for the table from the relcache, and look up + * each one in the pg_index syscache. We will test for equality on all + * columns present in all unique indexes which only reference columns and + * include all rows. + */ + tupdesc = matviewRel->rd_att; + foundUniqueIndex = false; + indexoidlist = RelationGetIndexList(matviewRel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + Form_pg_index index; + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", indexoid); + index = (Form_pg_index) GETSTRUCT(indexTuple); + + /* We're only interested if it is unique and valid. */ + if (index->indisunique && IndexIsValid(index)) + { + int numatts = index->indnatts; + int i; + bool expr = false; + Relation indexRel; + + /* Skip any index on an expression. */ + for (i = 0; i < numatts; i++) + { + if (index->indkey.values[i] == 0) + { + expr = true; + break; + } + } + if (expr) + { + ReleaseSysCache(indexTuple); + continue; + } + + /* Skip partial indexes. */ + indexRel = index_open(index->indexrelid, RowExclusiveLock); + if (indexRel->rd_indpred != NIL) + { + index_close(indexRel, NoLock); + ReleaseSysCache(indexTuple); + continue; + } + /* Hold the locks, since we're about to run DML which needs them. */ + index_close(indexRel, NoLock); + + /* Add quals for all columns from this index. */ + for (i = 0; i < numatts; i++) + { + int attnum = index->indkey.values[i]; + Oid type; + Oid op; + const char *colname; + + /* + * Only include the column once regardless of how many times + * it shows up in how many indexes. + * + * This is also useful later to omit columns which can not + * have changed from the SET clause of the UPDATE statement. + */ + if (usedForQual[attnum - 1]) + continue; + usedForQual[attnum - 1] = true; + + /* + * Actually add the qual, ANDed with any others. + */ + if (foundUniqueIndex) + appendStringInfoString(&querybuf, " AND "); + + colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname)); + appendStringInfo(&querybuf, "y.%s ", colname); + type = attnumTypeId(matviewRel, attnum); + op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr; + mv_GenerateOper(&querybuf, op); + appendStringInfo(&querybuf, " x.%s", colname); + + foundUniqueIndex = true; + } + } + ReleaseSysCache(indexTuple); + } + + list_free(indexoidlist); + + if (!foundUniqueIndex) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot refresh materialized view \"%s\" concurrently", + matviewname), + errhint("Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view."))); + + appendStringInfoString(&querybuf, + " AND y = x) WHERE (y.*) IS DISTINCT FROM (x.*)" + " ORDER BY tid"); + + /* Create the temporary "diff" table. */ + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* + * We have no further use for data from the "full-data" temp table, but we + * must keep it around because its type is reference from the diff table. + */ + + /* Analyze the diff table. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, "ANALYZE %s", diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + OpenMatViewIncrementalMaintenance(); + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also lock down security-restricted operations and arrange to + * make GUC variable changes local to this command. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(matviewRel->rd_rel->relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); + + /* Deletes must come before inserts; do them first. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "DELETE FROM %s WHERE ctid IN " + "(SELECT d.tid FROM %s d " + "WHERE d.tid IS NOT NULL " + "AND (d.y) IS NOT DISTINCT FROM NULL)", + matviewname, diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Updates before inserts gives a better chance at HOT updates. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, "UPDATE %s x SET ", matviewname); + + { + int i; + bool targetColFound = false; + + for (i = 0; i < tupdesc->natts; i++) + { + const char *colname; + + if (tupdesc->attrs[i]->attisdropped) + continue; + + if (usedForQual[i]) + continue; + + if (targetColFound) + appendStringInfoString(&querybuf, ", "); + targetColFound = true; + + colname = quote_identifier(NameStr((tupdesc->attrs[i])->attname)); + appendStringInfo(&querybuf, "%s = (d.y).%s", colname, colname); + } + + if (targetColFound) + { + appendStringInfo(&querybuf, + " FROM %s d " + "WHERE d.tid IS NOT NULL AND x.ctid = d.tid", + diffname); + + if (SPI_exec(querybuf.data, 0) != SPI_OK_UPDATE) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + } + } + + /* Inserts go last. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "INSERT INTO %s SELECT (y).* FROM %s WHERE tid IS NULL", + matviewname, diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + /* We're done maintaining the materialized view. */ + CloseMatViewIncrementalMaintenance(); + heap_close(tempRel, NoLock); + heap_close(matviewRel, NoLock); + + /* Clean up temp tables. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Close SPI context. */ + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); +} + +/* + * Swap the physical files of the target and transient tables, then rebuild + * the target's indexes and throw away the transient table. Security context + * swapping is handled by the called function, so it is not needed here. + */ +static void +refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap) +{ + finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, + RecentXmin, ReadNextMultiXactId()); + + RelationCacheInvalidateEntry(matviewOid); +} + +static void +OpenMatViewIncrementalMaintenance(void) +{ + matview_maintenance_depth++; +} + +static void +CloseMatViewIncrementalMaintenance(void) +{ + matview_maintenance_depth--; + Assert(matview_maintenance_depth >= 0); +} + +/* + * This should be used to test whether the backend is in a context where it is + * OK to allow DML statements to modify materialized views. We only want to + * allow that for internal code driven by the materialized view definition, + * not for arbitrary user-supplied code. + */ +bool +MatViewIncrementalMaintenanceIsEnabled(void) +{ + return matview_maintenance_depth > 0; +} diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index f56ef28e22..bd0a21987c 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -3541,7 +3541,8 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode) heap_close(OldHeap, NoLock); /* Create transient table that will receive the modified data */ - OIDNewHeap = make_new_heap(tab->relid, NewTableSpace); + OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, false, + AccessExclusiveLock); /* * Copy the heap data into the new table with the desired diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 3b664d0926..4d7345da57 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -42,6 +42,7 @@ #include "access/transam.h" #include "access/xact.h" #include "catalog/namespace.h" +#include "commands/matview.h" #include "commands/trigger.h" #include "executor/execdebug.h" #include "foreign/fdwapi.h" @@ -999,10 +1000,11 @@ CheckValidResultRel(Relation resultRel, CmdType operation) } break; case RELKIND_MATVIEW: - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot change materialized view \"%s\"", - RelationGetRelationName(resultRel)))); + if (!MatViewIncrementalMaintenanceIsEnabled()) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot change materialized view \"%s\"", + RelationGetRelationName(resultRel)))); break; case RELKIND_FOREIGN_TABLE: /* Okay only if the FDW supports it */ diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index e934c7b9ab..8fe5f1d427 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -950,7 +950,7 @@ ExecModifyTable(ModifyTableState *node) bool isNull; relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind; - if (relkind == RELKIND_RELATION) + if (relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW) { datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, @@ -1280,7 +1280,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) char relkind; relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind; - if (relkind == RELKIND_RELATION) + if (relkind == RELKIND_RELATION || + relkind == RELKIND_MATVIEW) { j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid"); if (!AttributeNumberIsValid(j->jf_junkAttNo)) diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index b5b8d63cff..ad7378dd93 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -3241,6 +3241,7 @@ _copyRefreshMatViewStmt(const RefreshMatViewStmt *from) { RefreshMatViewStmt *newnode = makeNode(RefreshMatViewStmt); + COPY_SCALAR_FIELD(concurrent); COPY_SCALAR_FIELD(skipData); COPY_NODE_FIELD(relation); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 3f96595e8e..e0d4bca809 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1521,6 +1521,7 @@ _equalCreateTableAsStmt(const CreateTableAsStmt *a, const CreateTableAsStmt *b) static bool _equalRefreshMatViewStmt(const RefreshMatViewStmt *a, const RefreshMatViewStmt *b) { + COMPARE_SCALAR_FIELD(concurrent); COMPARE_SCALAR_FIELD(skipData); COMPARE_NODE_FIELD(relation); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index f67ef0c9ca..5e9b3eda92 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -3301,11 +3301,12 @@ OptNoLog: UNLOGGED { $$ = RELPERSISTENCE_UNLOGGED; } *****************************************************************************/ RefreshMatViewStmt: - REFRESH MATERIALIZED VIEW qualified_name opt_with_data + REFRESH MATERIALIZED VIEW opt_concurrently qualified_name opt_with_data { RefreshMatViewStmt *n = makeNode(RefreshMatViewStmt); - n->relation = $4; - n->skipData = !($5); + n->concurrent = $4; + n->relation = $5; + n->skipData = !($6); $$ = (Node *) n; } ; diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index a67d2cdd90..5b7fc93eb2 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2871,11 +2871,22 @@ psql_completion(char *text, int start, int end) else if (pg_strcasecmp(prev3_wd, "REFRESH") == 0 && pg_strcasecmp(prev2_wd, "MATERIALIZED") == 0 && pg_strcasecmp(prev_wd, "VIEW") == 0) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_matviews, + " UNION SELECT 'CONCURRENTLY'"); + else if (pg_strcasecmp(prev4_wd, "REFRESH") == 0 && + pg_strcasecmp(prev3_wd, "MATERIALIZED") == 0 && + pg_strcasecmp(prev2_wd, "VIEW") == 0 && + pg_strcasecmp(prev_wd, "CONCURRENTLY") == 0) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_matviews, NULL); else if (pg_strcasecmp(prev4_wd, "REFRESH") == 0 && pg_strcasecmp(prev3_wd, "MATERIALIZED") == 0 && pg_strcasecmp(prev2_wd, "VIEW") == 0) COMPLETE_WITH_CONST("WITH"); + else if (pg_strcasecmp(prev5_wd, "REFRESH") == 0 && + pg_strcasecmp(prev4_wd, "MATERIALIZED") == 0 && + pg_strcasecmp(prev3_wd, "VIEW") == 0 && + pg_strcasecmp(prev2_wd, "CONCURRENTLY") == 0) + COMPLETE_WITH_CONST("WITH DATA"); else if (pg_strcasecmp(prev5_wd, "REFRESH") == 0 && pg_strcasecmp(prev4_wd, "MATERIALIZED") == 0 && pg_strcasecmp(prev3_wd, "VIEW") == 0 && @@ -2886,6 +2897,12 @@ psql_completion(char *text, int start, int end) COMPLETE_WITH_LIST(list_WITH_DATA); } + else if (pg_strcasecmp(prev6_wd, "REFRESH") == 0 && + pg_strcasecmp(prev5_wd, "MATERIALIZED") == 0 && + pg_strcasecmp(prev4_wd, "VIEW") == 0 && + pg_strcasecmp(prev3_wd, "CONCURRENTLY") == 0 && + pg_strcasecmp(prev_wd, "WITH") == 0) + COMPLETE_WITH_CONST("DATA"); else if (pg_strcasecmp(prev6_wd, "REFRESH") == 0 && pg_strcasecmp(prev5_wd, "MATERIALIZED") == 0 && pg_strcasecmp(prev4_wd, "VIEW") == 0 && diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index aa6d332e9d..03d9f176c5 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -25,7 +25,8 @@ extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck, LOCKMODE lockmode); extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal); -extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace); +extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, bool forcetemp, + LOCKMODE lockmode); extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bool swap_toast_by_content, diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h index dce724469e..29229076a3 100644 --- a/src/include/commands/matview.h +++ b/src/include/commands/matview.h @@ -27,4 +27,6 @@ extern void ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString extern DestReceiver *CreateTransientRelDestReceiver(Oid oid); +extern bool MatViewIncrementalMaintenanceIsEnabled(void); + #endif /* MATVIEW_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index de22dff893..bf81b5bdda 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2478,6 +2478,7 @@ typedef struct CreateTableAsStmt typedef struct RefreshMatViewStmt { NodeTag type; + bool concurrent; /* allow concurrent access? */ bool skipData; /* true for WITH NO DATA */ RangeVar *relation; /* relation to insert into */ } RefreshMatViewStmt; diff --git a/src/test/regress/expected/matview.out b/src/test/regress/expected/matview.out index a98de4f58d..5a31fda69f 100644 --- a/src/test/regress/expected/matview.out +++ b/src/test/regress/expected/matview.out @@ -73,6 +73,8 @@ SELECT * FROM tvm; CREATE MATERIALIZED VIEW tmm AS SELECT sum(totamt) AS grandtot FROM tm; CREATE MATERIALIZED VIEW tvmm AS SELECT sum(totamt) AS grandtot FROM tvm; +CREATE UNIQUE INDEX tvmm_expr ON tvmm ((grandtot > 0)); +CREATE UNIQUE INDEX tvmm_pred ON tvmm (grandtot) WHERE grandtot < 0; CREATE VIEW tvv AS SELECT sum(totamt) AS grandtot FROM tv; EXPLAIN (costs off) CREATE MATERIALIZED VIEW tvvm AS SELECT * FROM tvv; @@ -141,6 +143,9 @@ ALTER MATERIALIZED VIEW tvm SET SCHEMA mvschema; Column | Type | Modifiers | Storage | Stats target | Description ----------+---------+-----------+---------+--------------+------------- grandtot | numeric | | main | | +Indexes: + "tvmm_expr" UNIQUE, btree ((grandtot > 0::numeric)) + "tvmm_pred" UNIQUE, btree (grandtot) WHERE grandtot < 0::numeric View definition: SELECT sum(tvm.totamt) AS grandtot FROM mvschema.tvm; @@ -177,7 +182,7 @@ SELECT * FROM tvm ORDER BY type; z | 11 (3 rows) -REFRESH MATERIALIZED VIEW tm; +REFRESH MATERIALIZED VIEW CONCURRENTLY tm; REFRESH MATERIALIZED VIEW tvm; SELECT * FROM tm ORDER BY type; type | totamt @@ -237,6 +242,9 @@ SELECT * FROM tvvm; (1 row) REFRESH MATERIALIZED VIEW tmm; +REFRESH MATERIALIZED VIEW CONCURRENTLY tvmm; +ERROR: cannot refresh materialized view "public.tvmm" concurrently +HINT: Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view. REFRESH MATERIALIZED VIEW tvmm; REFRESH MATERIALIZED VIEW tvvm; EXPLAIN (costs off) @@ -281,6 +289,9 @@ SELECT * FROM tvvm; -- test diemv when the mv does not exist DROP MATERIALIZED VIEW IF EXISTS no_such_mv; NOTICE: materialized view "no_such_mv" does not exist, skipping +-- make sure invalid comination of options is prohibited +REFRESH MATERIALIZED VIEW CONCURRENTLY tvmm WITH NO DATA; +ERROR: CONCURRENTLY and WITH NO DATA options cannot be used together -- test join of mv and view SELECT type, m.totamt AS mtot, v.totamt AS vtot FROM tm m LEFT JOIN tv v USING (type) ORDER BY type; type | mtot | vtot @@ -385,3 +396,28 @@ SELECT * FROM hogeview WHERE i < 10; DROP TABLE hoge CASCADE; NOTICE: drop cascades to materialized view hogeview +-- test that duplicate values on unique index prevent refresh +CREATE TABLE foo(a, b) AS VALUES(1, 10); +CREATE MATERIALIZED VIEW mv AS SELECT * FROM foo; +CREATE UNIQUE INDEX ON mv(a); +INSERT INTO foo SELECT * FROM foo; +REFRESH MATERIALIZED VIEW mv; +ERROR: could not create unique index "mv_a_idx" +DETAIL: Key (a)=(1) is duplicated. +REFRESH MATERIALIZED VIEW CONCURRENTLY mv; +ERROR: new data for "mv" contains duplicate rows without any NULL columns +DETAIL: Row: (1,10) +DROP TABLE foo CASCADE; +NOTICE: drop cascades to materialized view mv +-- make sure that all indexes covered by unique indexes works +CREATE TABLE foo(a, b, c) AS VALUES(1, 2, 3); +CREATE MATERIALIZED VIEW mv AS SELECT * FROM foo; +CREATE UNIQUE INDEX ON mv (a); +CREATE UNIQUE INDEX ON mv (b); +CREATE UNIQUE INDEX on mv (c); +INSERT INTO foo VALUES(2, 3, 4); +INSERT INTO foo VALUES(3, 4, 5); +REFRESH MATERIALIZED VIEW mv; +REFRESH MATERIALIZED VIEW CONCURRENTLY mv; +DROP TABLE foo CASCADE; +NOTICE: drop cascades to materialized view mv diff --git a/src/test/regress/sql/matview.sql b/src/test/regress/sql/matview.sql index 975f8dd575..9d60bbbbe4 100644 --- a/src/test/regress/sql/matview.sql +++ b/src/test/regress/sql/matview.sql @@ -29,6 +29,8 @@ CREATE MATERIALIZED VIEW tvm AS SELECT * FROM tv ORDER BY type; SELECT * FROM tvm; CREATE MATERIALIZED VIEW tmm AS SELECT sum(totamt) AS grandtot FROM tm; CREATE MATERIALIZED VIEW tvmm AS SELECT sum(totamt) AS grandtot FROM tvm; +CREATE UNIQUE INDEX tvmm_expr ON tvmm ((grandtot > 0)); +CREATE UNIQUE INDEX tvmm_pred ON tvmm (grandtot) WHERE grandtot < 0; CREATE VIEW tvv AS SELECT sum(totamt) AS grandtot FROM tv; EXPLAIN (costs off) CREATE MATERIALIZED VIEW tvvm AS SELECT * FROM tvv; @@ -57,7 +59,7 @@ INSERT INTO t VALUES (6, 'z', 13); -- confirm pre- and post-refresh contents of fairly simple materialized views SELECT * FROM tm ORDER BY type; SELECT * FROM tvm ORDER BY type; -REFRESH MATERIALIZED VIEW tm; +REFRESH MATERIALIZED VIEW CONCURRENTLY tm; REFRESH MATERIALIZED VIEW tvm; SELECT * FROM tm ORDER BY type; SELECT * FROM tvm ORDER BY type; @@ -74,6 +76,7 @@ SELECT * FROM tmm; SELECT * FROM tvmm; SELECT * FROM tvvm; REFRESH MATERIALIZED VIEW tmm; +REFRESH MATERIALIZED VIEW CONCURRENTLY tvmm; REFRESH MATERIALIZED VIEW tvmm; REFRESH MATERIALIZED VIEW tvvm; EXPLAIN (costs off) @@ -89,6 +92,9 @@ SELECT * FROM tvvm; -- test diemv when the mv does not exist DROP MATERIALIZED VIEW IF EXISTS no_such_mv; +-- make sure invalid comination of options is prohibited +REFRESH MATERIALIZED VIEW CONCURRENTLY tvmm WITH NO DATA; + -- test join of mv and view SELECT type, m.totamt AS mtot, v.totamt AS vtot FROM tm m LEFT JOIN tv v USING (type) ORDER BY type; @@ -124,3 +130,24 @@ SELECT * FROM hogeview WHERE i < 10; VACUUM ANALYZE; SELECT * FROM hogeview WHERE i < 10; DROP TABLE hoge CASCADE; + +-- test that duplicate values on unique index prevent refresh +CREATE TABLE foo(a, b) AS VALUES(1, 10); +CREATE MATERIALIZED VIEW mv AS SELECT * FROM foo; +CREATE UNIQUE INDEX ON mv(a); +INSERT INTO foo SELECT * FROM foo; +REFRESH MATERIALIZED VIEW mv; +REFRESH MATERIALIZED VIEW CONCURRENTLY mv; +DROP TABLE foo CASCADE; + +-- make sure that all indexes covered by unique indexes works +CREATE TABLE foo(a, b, c) AS VALUES(1, 2, 3); +CREATE MATERIALIZED VIEW mv AS SELECT * FROM foo; +CREATE UNIQUE INDEX ON mv (a); +CREATE UNIQUE INDEX ON mv (b); +CREATE UNIQUE INDEX on mv (c); +INSERT INTO foo VALUES(2, 3, 4); +INSERT INTO foo VALUES(3, 4, 5); +REFRESH MATERIALIZED VIEW mv; +REFRESH MATERIALIZED VIEW CONCURRENTLY mv; +DROP TABLE foo CASCADE; -- 2.40.0