]> granicus.if.org Git - postgresql/blob - src/backend/commands/matview.c
fcfc678813d5ce2f02aacb31bec3b5635825c4c4
[postgresql] / src / backend / commands / matview.c
1 /*-------------------------------------------------------------------------
2  *
3  * matview.c
4  *        materialized view support
5  *
6  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/commands/matview.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/htup_details.h"
18 #include "access/multixact.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "catalog/pg_operator.h"
24 #include "commands/cluster.h"
25 #include "commands/matview.h"
26 #include "commands/tablecmds.h"
27 #include "commands/tablespace.h"
28 #include "executor/executor.h"
29 #include "executor/spi.h"
30 #include "miscadmin.h"
31 #include "parser/parse_relation.h"
32 #include "rewrite/rewriteHandler.h"
33 #include "storage/smgr.h"
34 #include "tcop/tcopprot.h"
35 #include "utils/builtins.h"
36 #include "utils/lsyscache.h"
37 #include "utils/rel.h"
38 #include "utils/snapmgr.h"
39 #include "utils/syscache.h"
40 #include "utils/typcache.h"
41
42
43 typedef struct
44 {
45         DestReceiver pub;                       /* publicly-known function pointers */
46         Oid                     transientoid;   /* OID of new heap into which to store */
47         /* These fields are filled by transientrel_startup: */
48         Relation        transientrel;   /* relation to write to */
49         CommandId       output_cid;             /* cmin to insert in output tuples */
50         int                     hi_options;             /* heap_insert performance options */
51         BulkInsertState bistate;        /* bulk insert state */
52 } DR_transientrel;
53
54 static int      matview_maintenance_depth = 0;
55
56 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
57 static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
58 static void transientrel_shutdown(DestReceiver *self);
59 static void transientrel_destroy(DestReceiver *self);
60 static void refresh_matview_datafill(DestReceiver *dest, Query *query,
61                                                  const char *queryString, Oid relowner);
62
63 static char *make_temptable_name_n(char *tempname, int n);
64 static void mv_GenerateOper(StringInfo buf, Oid opoid);
65
66 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid);
67 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap);
68
69 static void OpenMatViewIncrementalMaintenance(void);
70 static void CloseMatViewIncrementalMaintenance(void);
71
72 /*
73  * SetMatViewPopulatedState
74  *              Mark a materialized view as populated, or not.
75  *
76  * NOTE: caller must be holding an appropriate lock on the relation.
77  */
78 void
79 SetMatViewPopulatedState(Relation relation, bool newstate)
80 {
81         Relation        pgrel;
82         HeapTuple       tuple;
83
84         Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
85
86         /*
87          * Update relation's pg_class entry.  Crucial side-effect: other backends
88          * (and this one too!) are sent SI message to make them rebuild relcache
89          * entries.
90          */
91         pgrel = heap_open(RelationRelationId, RowExclusiveLock);
92         tuple = SearchSysCacheCopy1(RELOID,
93                                                                 ObjectIdGetDatum(RelationGetRelid(relation)));
94         if (!HeapTupleIsValid(tuple))
95                 elog(ERROR, "cache lookup failed for relation %u",
96                          RelationGetRelid(relation));
97
98         ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
99
100         simple_heap_update(pgrel, &tuple->t_self, tuple);
101
102         CatalogUpdateIndexes(pgrel, tuple);
103
104         heap_freetuple(tuple);
105         heap_close(pgrel, RowExclusiveLock);
106
107         /*
108          * Advance command counter to make the updated pg_class row locally
109          * visible.
110          */
111         CommandCounterIncrement();
112 }
113
114 /*
115  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
116  *
117  * This refreshes the materialized view by creating a new table and swapping
118  * the relfilenodes of the new table and the old materialized view, so the OID
119  * of the original materialized view is preserved. Thus we do not lose GRANT
120  * nor references to this materialized view.
121  *
122  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
123  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
124  * statement associated with the materialized view.  The statement node's
125  * skipData field shows whether the clause was used.
126  *
127  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
128  * the new heap, it's better to create the indexes afterwards than to fill them
129  * incrementally while we load.
130  *
131  * The matview's "populated" state is changed based on whether the contents
132  * reflect the result set of the materialized view's query.
133  */
134 void
135 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
136                                    ParamListInfo params, char *completionTag)
137 {
138         Oid                     matviewOid;
139         Relation        matviewRel;
140         RewriteRule *rule;
141         List       *actions;
142         Query      *dataQuery;
143         Oid                     tableSpace;
144         Oid                     owner;
145         Oid                     OIDNewHeap;
146         DestReceiver *dest;
147         bool            concurrent;
148         LOCKMODE        lockmode;
149
150         /* Determine strength of lock needed. */
151         concurrent = stmt->concurrent;
152         lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
153
154         /*
155          * Get a lock until end of transaction.
156          */
157         matviewOid = RangeVarGetRelidExtended(stmt->relation,
158                                                                                   lockmode, false, false,
159                                                                                   RangeVarCallbackOwnsTable, NULL);
160         matviewRel = heap_open(matviewOid, NoLock);
161
162         /* Make sure it is a materialized view. */
163         if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
164                 ereport(ERROR,
165                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
166                                  errmsg("\"%s\" is not a materialized view",
167                                                 RelationGetRelationName(matviewRel))));
168
169         /* Check that CONCURRENTLY is not specified if not populated. */
170         if (concurrent && !RelationIsPopulated(matviewRel))
171                 ereport(ERROR,
172                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
173                                  errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
174
175         /* Check that conflicting options have not been specified. */
176         if (concurrent && stmt->skipData)
177                 ereport(ERROR,
178                                 (errcode(ERRCODE_SYNTAX_ERROR),
179                                  errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
180
181         /* We don't allow an oid column for a materialized view. */
182         Assert(!matviewRel->rd_rel->relhasoids);
183
184         /*
185          * Check that everything is correct for a refresh. Problems at this point
186          * are internal errors, so elog is sufficient.
187          */
188         if (matviewRel->rd_rel->relhasrules == false ||
189                 matviewRel->rd_rules->numLocks < 1)
190                 elog(ERROR,
191                          "materialized view \"%s\" is missing rewrite information",
192                          RelationGetRelationName(matviewRel));
193
194         if (matviewRel->rd_rules->numLocks > 1)
195                 elog(ERROR,
196                          "materialized view \"%s\" has too many rules",
197                          RelationGetRelationName(matviewRel));
198
199         rule = matviewRel->rd_rules->rules[0];
200         if (rule->event != CMD_SELECT || !(rule->isInstead))
201                 elog(ERROR,
202                          "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
203                          RelationGetRelationName(matviewRel));
204
205         actions = rule->actions;
206         if (list_length(actions) != 1)
207                 elog(ERROR,
208                          "the rule for materialized view \"%s\" is not a single action",
209                          RelationGetRelationName(matviewRel));
210
211         /*
212          * The stored query was rewritten at the time of the MV definition, but
213          * has not been scribbled on by the planner.
214          */
215         dataQuery = (Query *) linitial(actions);
216         Assert(IsA(dataQuery, Query));
217
218         /*
219          * Check for active uses of the relation in the current transaction, such
220          * as open scans.
221          *
222          * NB: We count on this to protect us against problems with refreshing the
223          * data using HEAP_INSERT_FROZEN.
224          */
225         CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
226
227         /*
228          * Tentatively mark the matview as populated or not (this will roll back
229          * if we fail later).
230          */
231         SetMatViewPopulatedState(matviewRel, !stmt->skipData);
232
233         /* Concurrent refresh builds new data in temp tablespace, and does diff. */
234         if (concurrent)
235                 tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP);
236         else
237                 tableSpace = matviewRel->rd_rel->reltablespace;
238
239         owner = matviewRel->rd_rel->relowner;
240
241         heap_close(matviewRel, NoLock);
242
243         /* Create the transient table that will receive the regenerated data. */
244         OIDNewHeap = make_new_heap(matviewOid, tableSpace, concurrent,
245                                                            ExclusiveLock);
246         dest = CreateTransientRelDestReceiver(OIDNewHeap);
247
248         /* Generate the data, if wanted. */
249         if (!stmt->skipData)
250                 refresh_matview_datafill(dest, dataQuery, queryString, owner);
251
252         /* Make the matview match the newly generated data. */
253         if (concurrent)
254         {
255                 int                     old_depth = matview_maintenance_depth;
256
257                 PG_TRY();
258                 {
259                         refresh_by_match_merge(matviewOid, OIDNewHeap);
260                 }
261                 PG_CATCH();
262                 {
263                         matview_maintenance_depth = old_depth;
264                         PG_RE_THROW();
265                 }
266                 PG_END_TRY();
267                 Assert(matview_maintenance_depth == old_depth);
268         }
269         else
270                 refresh_by_heap_swap(matviewOid, OIDNewHeap);
271 }
272
273 /*
274  * refresh_matview_datafill
275  */
276 static void
277 refresh_matview_datafill(DestReceiver *dest, Query *query,
278                                                  const char *queryString, Oid relowner)
279 {
280         List       *rewritten;
281         PlannedStmt *plan;
282         QueryDesc  *queryDesc;
283         Oid                     save_userid;
284         int                     save_sec_context;
285         int                     save_nestlevel;
286
287         /*
288          * Switch to the owner's userid, so that any functions are run as that
289          * user.  Also lock down security-restricted operations and arrange to
290          * make GUC variable changes local to this command.
291          */
292         GetUserIdAndSecContext(&save_userid, &save_sec_context);
293         SetUserIdAndSecContext(relowner,
294                                                    save_sec_context | SECURITY_RESTRICTED_OPERATION);
295         save_nestlevel = NewGUCNestLevel();
296
297         /* Rewrite, copying the given Query to make sure it's not changed */
298         rewritten = QueryRewrite((Query *) copyObject(query));
299
300         /* SELECT should never rewrite to more or less than one SELECT query */
301         if (list_length(rewritten) != 1)
302                 elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
303         query = (Query *) linitial(rewritten);
304
305         /* Check for user-requested abort. */
306         CHECK_FOR_INTERRUPTS();
307
308         /* Plan the query which will generate data for the refresh. */
309         plan = pg_plan_query(query, 0, NULL);
310
311         /*
312          * Use a snapshot with an updated command ID to ensure this query sees
313          * results of any previously executed queries.  (This could only matter if
314          * the planner executed an allegedly-stable function that changed the
315          * database contents, but let's do it anyway to be safe.)
316          */
317         PushCopiedSnapshot(GetActiveSnapshot());
318         UpdateActiveSnapshotCommandId();
319
320         /* Create a QueryDesc, redirecting output to our tuple receiver */
321         queryDesc = CreateQueryDesc(plan, queryString,
322                                                                 GetActiveSnapshot(), InvalidSnapshot,
323                                                                 dest, NULL, 0);
324
325         /* call ExecutorStart to prepare the plan for execution */
326         ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
327
328         /* run the plan */
329         ExecutorRun(queryDesc, ForwardScanDirection, 0L);
330
331         /* and clean up */
332         ExecutorFinish(queryDesc);
333         ExecutorEnd(queryDesc);
334
335         FreeQueryDesc(queryDesc);
336
337         PopActiveSnapshot();
338
339         /* Roll back any GUC changes */
340         AtEOXact_GUC(false, save_nestlevel);
341
342         /* Restore userid and security context */
343         SetUserIdAndSecContext(save_userid, save_sec_context);
344 }
345
346 DestReceiver *
347 CreateTransientRelDestReceiver(Oid transientoid)
348 {
349         DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
350
351         self->pub.receiveSlot = transientrel_receive;
352         self->pub.rStartup = transientrel_startup;
353         self->pub.rShutdown = transientrel_shutdown;
354         self->pub.rDestroy = transientrel_destroy;
355         self->pub.mydest = DestTransientRel;
356         self->transientoid = transientoid;
357
358         return (DestReceiver *) self;
359 }
360
361 /*
362  * transientrel_startup --- executor startup
363  */
364 static void
365 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
366 {
367         DR_transientrel *myState = (DR_transientrel *) self;
368         Relation        transientrel;
369
370         transientrel = heap_open(myState->transientoid, NoLock);
371
372         /*
373          * Fill private fields of myState for use by later routines
374          */
375         myState->transientrel = transientrel;
376         myState->output_cid = GetCurrentCommandId(true);
377
378         /*
379          * We can skip WAL-logging the insertions, unless PITR or streaming
380          * replication is in use. We can skip the FSM in any case.
381          */
382         myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
383         if (!XLogIsNeeded())
384                 myState->hi_options |= HEAP_INSERT_SKIP_WAL;
385         myState->bistate = GetBulkInsertState();
386
387         /* Not using WAL requires smgr_targblock be initially invalid */
388         Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
389 }
390
391 /*
392  * transientrel_receive --- receive one tuple
393  */
394 static void
395 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
396 {
397         DR_transientrel *myState = (DR_transientrel *) self;
398         HeapTuple       tuple;
399
400         /*
401          * get the heap tuple out of the tuple table slot, making sure we have a
402          * writable copy
403          */
404         tuple = ExecMaterializeSlot(slot);
405
406         heap_insert(myState->transientrel,
407                                 tuple,
408                                 myState->output_cid,
409                                 myState->hi_options,
410                                 myState->bistate);
411
412         /* We know this is a newly created relation, so there are no indexes */
413 }
414
415 /*
416  * transientrel_shutdown --- executor end
417  */
418 static void
419 transientrel_shutdown(DestReceiver *self)
420 {
421         DR_transientrel *myState = (DR_transientrel *) self;
422
423         FreeBulkInsertState(myState->bistate);
424
425         /* If we skipped using WAL, must heap_sync before commit */
426         if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
427                 heap_sync(myState->transientrel);
428
429         /* close transientrel, but keep lock until commit */
430         heap_close(myState->transientrel, NoLock);
431         myState->transientrel = NULL;
432 }
433
434 /*
435  * transientrel_destroy --- release DestReceiver object
436  */
437 static void
438 transientrel_destroy(DestReceiver *self)
439 {
440         pfree(self);
441 }
442
443
444 /*
445  * Given a qualified temporary table name, append an underscore followed by
446  * the given integer, to make a new table name based on the old one.
447  *
448  * This leaks memory through palloc(), which won't be cleaned up until the
449  * current memory memory context is freed.
450  */
451 static char *
452 make_temptable_name_n(char *tempname, int n)
453 {
454         StringInfoData namebuf;
455
456         initStringInfo(&namebuf);
457         appendStringInfoString(&namebuf, tempname);
458         appendStringInfo(&namebuf, "_%i", n);
459         return namebuf.data;
460 }
461
462 static void
463 mv_GenerateOper(StringInfo buf, Oid opoid)
464 {
465         HeapTuple       opertup;
466         Form_pg_operator operform;
467
468         opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid));
469         if (!HeapTupleIsValid(opertup))
470                 elog(ERROR, "cache lookup failed for operator %u", opoid);
471         operform = (Form_pg_operator) GETSTRUCT(opertup);
472         Assert(operform->oprkind == 'b');
473
474         appendStringInfo(buf, "OPERATOR(%s.%s)",
475                                 quote_identifier(get_namespace_name(operform->oprnamespace)),
476                                          NameStr(operform->oprname));
477
478         ReleaseSysCache(opertup);
479 }
480
481 /*
482  * refresh_by_match_merge
483  *
484  * Refresh a materialized view with transactional semantics, while allowing
485  * concurrent reads.
486  *
487  * This is called after a new version of the data has been created in a
488  * temporary table.  It performs a full outer join against the old version of
489  * the data, producing "diff" results.  This join cannot work if there are any
490  * duplicated rows in either the old or new versions, in the sense that every
491  * column would compare as equal between the two rows.  It does work correctly
492  * in the face of rows which have at least one NULL value, with all non-NULL
493  * columns equal.  The behavior of NULLs on equality tests and on UNIQUE
494  * indexes turns out to be quite convenient here; the tests we need to make
495  * are consistent with default behavior.  If there is at least one UNIQUE
496  * index on the materialized view, we have exactly the guarantee we need.
497  *
498  * The temporary table used to hold the diff results contains just the TID of
499  * the old record (if matched) and the ROW from the new table as a single
500  * column of complex record type (if matched).
501  *
502  * Once we have the diff table, we perform set-based DELETE and INSERT
503  * operations against the materialized view, and discard both temporary
504  * tables.
505  *
506  * Everything from the generation of the new data to applying the differences
507  * takes place under cover of an ExclusiveLock, since it seems as though we
508  * would want to prohibit not only concurrent REFRESH operations, but also
509  * incremental maintenance.  It also doesn't seem reasonable or safe to allow
510  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
511  * this command.
512  */
513 static void
514 refresh_by_match_merge(Oid matviewOid, Oid tempOid)
515 {
516         StringInfoData querybuf;
517         Relation        matviewRel;
518         Relation        tempRel;
519         char       *matviewname;
520         char       *tempname;
521         char       *diffname;
522         TupleDesc       tupdesc;
523         bool            foundUniqueIndex;
524         List       *indexoidlist;
525         ListCell   *indexoidscan;
526         int16           relnatts;
527         bool       *usedForQual;
528         Oid                     save_userid;
529         int                     save_sec_context;
530         int                     save_nestlevel;
531
532         initStringInfo(&querybuf);
533         matviewRel = heap_open(matviewOid, NoLock);
534         matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
535                                                                                 RelationGetRelationName(matviewRel));
536         tempRel = heap_open(tempOid, NoLock);
537         tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
538                                                                                   RelationGetRelationName(tempRel));
539         diffname = make_temptable_name_n(tempname, 2);
540
541         relnatts = matviewRel->rd_rel->relnatts;
542         usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
543
544         /* Open SPI context. */
545         if (SPI_connect() != SPI_OK_CONNECT)
546                 elog(ERROR, "SPI_connect failed");
547
548         /* Analyze the temp table with the new contents. */
549         appendStringInfo(&querybuf, "ANALYZE %s", tempname);
550         if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
551                 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
552
553         /*
554          * We need to ensure that there are not duplicate rows without NULLs in
555          * the new data set before we can count on the "diff" results.  Check for
556          * that in a way that allows showing the first duplicated row found.  Even
557          * after we pass this test, a unique index on the materialized view may
558          * find a duplicate key problem.
559          */
560         resetStringInfo(&querybuf);
561         appendStringInfo(&querybuf,
562                                          "SELECT newdata FROM %s newdata "
563                                          "WHERE newdata IS NOT NULL AND EXISTS "
564                                          "(SELECT * FROM %s newdata2 WHERE newdata2 IS NOT NULL "
565                                          "AND newdata2 OPERATOR(pg_catalog.*=) newdata "
566                                          "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
567                                          "newdata.ctid) LIMIT 1",
568                                          tempname, tempname);
569         if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
570                 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
571         if (SPI_processed > 0)
572         {
573                 ereport(ERROR,
574                                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
575                                  errmsg("new data for \"%s\" contains duplicate rows without any NULL columns",
576                                                 RelationGetRelationName(matviewRel)),
577                                  errdetail("Row: %s",
578                         SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
579         }
580
581         /* Start building the query for creating the diff table. */
582         resetStringInfo(&querybuf);
583         appendStringInfo(&querybuf,
584                                          "CREATE TEMP TABLE %s AS "
585                                          "SELECT mv.ctid AS tid, newdata "
586                                          "FROM %s mv FULL JOIN %s newdata ON (",
587                                          diffname, matviewname, tempname);
588
589         /*
590          * Get the list of index OIDs for the table from the relcache, and look up
591          * each one in the pg_index syscache.  We will test for equality on all
592          * columns present in all unique indexes which only reference columns and
593          * include all rows.
594          */
595         tupdesc = matviewRel->rd_att;
596         foundUniqueIndex = false;
597         indexoidlist = RelationGetIndexList(matviewRel);
598
599         foreach(indexoidscan, indexoidlist)
600         {
601                 Oid                     indexoid = lfirst_oid(indexoidscan);
602                 Relation        indexRel;
603                 HeapTuple       indexTuple;
604                 Form_pg_index indexStruct;
605
606                 indexRel = index_open(indexoid, RowExclusiveLock);
607                 indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
608                 if (!HeapTupleIsValid(indexTuple))              /* should not happen */
609                         elog(ERROR, "cache lookup failed for index %u", indexoid);
610                 indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
611
612                 /* We're only interested if it is unique and valid. */
613                 if (indexStruct->indisunique && IndexIsValid(indexStruct))
614                 {
615                         int                     numatts = indexStruct->indnatts;
616                         int                     i;
617
618                         /* Skip any index on an expression. */
619                         if (RelationGetIndexExpressions(indexRel) != NIL)
620                         {
621                                 index_close(indexRel, NoLock);
622                                 ReleaseSysCache(indexTuple);
623                                 continue;
624                         }
625
626                         /* Skip partial indexes. */
627                         if (RelationGetIndexPredicate(indexRel) != NIL)
628                         {
629                                 index_close(indexRel, NoLock);
630                                 ReleaseSysCache(indexTuple);
631                                 continue;
632                         }
633
634                         /* Hold the locks, since we're about to run DML which needs them. */
635                         index_close(indexRel, NoLock);
636
637                         /* Add quals for all columns from this index. */
638                         for (i = 0; i < numatts; i++)
639                         {
640                                 int                     attnum = indexStruct->indkey.values[i];
641                                 Oid                     type;
642                                 Oid                     op;
643                                 const char *colname;
644
645                                 /*
646                                  * Only include the column once regardless of how many times
647                                  * it shows up in how many indexes.
648                                  */
649                                 if (usedForQual[attnum - 1])
650                                         continue;
651                                 usedForQual[attnum - 1] = true;
652
653                                 /*
654                                  * Actually add the qual, ANDed with any others.
655                                  */
656                                 if (foundUniqueIndex)
657                                         appendStringInfoString(&querybuf, " AND ");
658
659                                 colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname));
660                                 appendStringInfo(&querybuf, "newdata.%s ", colname);
661                                 type = attnumTypeId(matviewRel, attnum);
662                                 op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr;
663                                 mv_GenerateOper(&querybuf, op);
664                                 appendStringInfo(&querybuf, " mv.%s", colname);
665
666                                 foundUniqueIndex = true;
667                         }
668                 }
669                 ReleaseSysCache(indexTuple);
670         }
671
672         list_free(indexoidlist);
673
674         if (!foundUniqueIndex)
675                 ereport(ERROR,
676                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
677                            errmsg("cannot refresh materialized view \"%s\" concurrently",
678                                           matviewname),
679                                  errhint("Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view.")));
680
681         appendStringInfoString(&querybuf,
682                                                    " AND newdata OPERATOR(pg_catalog.*=) mv) "
683                                                    "WHERE newdata IS NULL OR mv IS NULL "
684                                                    "ORDER BY tid");
685
686         /* Create the temporary "diff" table. */
687         if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
688                 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
689
690         /*
691          * We have no further use for data from the "full-data" temp table, but we
692          * must keep it around because its type is reference from the diff table.
693          */
694
695         /* Analyze the diff table. */
696         resetStringInfo(&querybuf);
697         appendStringInfo(&querybuf, "ANALYZE %s", diffname);
698         if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
699                 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
700
701         OpenMatViewIncrementalMaintenance();
702
703         /*
704          * Switch to the owner's userid, so that any functions are run as that
705          * user.  Also lock down security-restricted operations and arrange to
706          * make GUC variable changes local to this command.
707          */
708         GetUserIdAndSecContext(&save_userid, &save_sec_context);
709         SetUserIdAndSecContext(matviewRel->rd_rel->relowner,
710                                                    save_sec_context | SECURITY_RESTRICTED_OPERATION);
711         save_nestlevel = NewGUCNestLevel();
712
713         /* Deletes must come before inserts; do them first. */
714         resetStringInfo(&querybuf);
715         appendStringInfo(&querybuf,
716                                          "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
717                                          "(SELECT diff.tid FROM %s diff "
718                                          "WHERE diff.tid IS NOT NULL "
719                                          "AND diff.newdata IS NULL)",
720                                          matviewname, diffname);
721         if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
722                 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
723
724         /* Inserts go last. */
725         resetStringInfo(&querybuf);
726         appendStringInfo(&querybuf,
727                                          "INSERT INTO %s SELECT (diff.newdata).* "
728                                          "FROM %s diff WHERE tid IS NULL",
729                                          matviewname, diffname);
730         if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
731                 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
732
733         /* Roll back any GUC changes */
734         AtEOXact_GUC(false, save_nestlevel);
735
736         /* Restore userid and security context */
737         SetUserIdAndSecContext(save_userid, save_sec_context);
738
739         /* We're done maintaining the materialized view. */
740         CloseMatViewIncrementalMaintenance();
741         heap_close(tempRel, NoLock);
742         heap_close(matviewRel, NoLock);
743
744         /* Clean up temp tables. */
745         resetStringInfo(&querybuf);
746         appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
747         if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
748                 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
749
750         /* Close SPI context. */
751         if (SPI_finish() != SPI_OK_FINISH)
752                 elog(ERROR, "SPI_finish failed");
753 }
754
755 /*
756  * Swap the physical files of the target and transient tables, then rebuild
757  * the target's indexes and throw away the transient table.  Security context
758  * swapping is handled by the called function, so it is not needed here.
759  */
760 static void
761 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap)
762 {
763         finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
764                                          RecentXmin, ReadNextMultiXactId());
765 }
766
767
768 /*
769  * This should be used to test whether the backend is in a context where it is
770  * OK to allow DML statements to modify materialized views.  We only want to
771  * allow that for internal code driven by the materialized view definition,
772  * not for arbitrary user-supplied code.
773  *
774  * While the function names reflect the fact that their main intended use is
775  * incremental maintenance of materialized views (in response to changes to
776  * the data in referenced relations), they are initially used to allow REFRESH
777  * without blocking concurrent reads.
778  */
779 bool
780 MatViewIncrementalMaintenanceIsEnabled(void)
781 {
782         return matview_maintenance_depth > 0;
783 }
784
785 static void
786 OpenMatViewIncrementalMaintenance(void)
787 {
788         matview_maintenance_depth++;
789 }
790
791 static void
792 CloseMatViewIncrementalMaintenance(void)
793 {
794         matview_maintenance_depth--;
795         Assert(matview_maintenance_depth >= 0);
796 }