Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * matview.c
4 : * materialized view support
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/matview.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/htup_details.h"
18 : #include "access/multixact.h"
19 : #include "access/xact.h"
20 : #include "access/xlog.h"
21 : #include "catalog/catalog.h"
22 : #include "catalog/indexing.h"
23 : #include "catalog/namespace.h"
24 : #include "catalog/pg_operator.h"
25 : #include "commands/cluster.h"
26 : #include "commands/matview.h"
27 : #include "commands/tablecmds.h"
28 : #include "commands/tablespace.h"
29 : #include "executor/executor.h"
30 : #include "executor/spi.h"
31 : #include "miscadmin.h"
32 : #include "parser/parse_relation.h"
33 : #include "pgstat.h"
34 : #include "rewrite/rewriteHandler.h"
35 : #include "storage/lmgr.h"
36 : #include "storage/smgr.h"
37 : #include "tcop/tcopprot.h"
38 : #include "utils/builtins.h"
39 : #include "utils/lsyscache.h"
40 : #include "utils/rel.h"
41 : #include "utils/snapmgr.h"
42 : #include "utils/syscache.h"
43 : #include "utils/typcache.h"
44 :
45 :
46 : typedef struct
47 : {
48 : DestReceiver pub; /* publicly-known function pointers */
49 : Oid transientoid; /* OID of new heap into which to store */
50 : /* These fields are filled by transientrel_startup: */
51 : Relation transientrel; /* relation to write to */
52 : CommandId output_cid; /* cmin to insert in output tuples */
53 : int hi_options; /* heap_insert performance options */
54 : BulkInsertState bistate; /* bulk insert state */
55 : } DR_transientrel;
56 :
57 : static int matview_maintenance_depth = 0;
58 :
59 : static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
60 : static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
61 : static void transientrel_shutdown(DestReceiver *self);
62 : static void transientrel_destroy(DestReceiver *self);
63 : static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
64 : const char *queryString);
65 :
66 : static char *make_temptable_name_n(char *tempname, int n);
67 : static void mv_GenerateOper(StringInfo buf, Oid opoid);
68 :
69 : static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
70 : int save_sec_context);
71 : static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
72 :
73 : static void OpenMatViewIncrementalMaintenance(void);
74 : static void CloseMatViewIncrementalMaintenance(void);
75 :
76 : /*
77 : * SetMatViewPopulatedState
78 : * Mark a materialized view as populated, or not.
79 : *
80 : * NOTE: caller must be holding an appropriate lock on the relation.
81 : */
82 : void
83 38 : SetMatViewPopulatedState(Relation relation, bool newstate)
84 : {
85 : Relation pgrel;
86 : HeapTuple tuple;
87 :
88 38 : Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
89 :
90 : /*
91 : * Update relation's pg_class entry. Crucial side-effect: other backends
92 : * (and this one too!) are sent SI message to make them rebuild relcache
93 : * entries.
94 : */
95 38 : pgrel = heap_open(RelationRelationId, RowExclusiveLock);
96 38 : tuple = SearchSysCacheCopy1(RELOID,
97 : ObjectIdGetDatum(RelationGetRelid(relation)));
98 38 : if (!HeapTupleIsValid(tuple))
99 0 : elog(ERROR, "cache lookup failed for relation %u",
100 : RelationGetRelid(relation));
101 :
102 38 : ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
103 :
104 38 : CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
105 :
106 38 : heap_freetuple(tuple);
107 38 : heap_close(pgrel, RowExclusiveLock);
108 :
109 : /*
110 : * Advance command counter to make the updated pg_class row locally
111 : * visible.
112 : */
113 38 : CommandCounterIncrement();
114 38 : }
115 :
116 : /*
117 : * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
118 : *
119 : * This refreshes the materialized view by creating a new table and swapping
120 : * the relfilenodes of the new table and the old materialized view, so the OID
121 : * of the original materialized view is preserved. Thus we do not lose GRANT
122 : * nor references to this materialized view.
123 : *
124 : * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
125 : * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
126 : * statement associated with the materialized view. The statement node's
127 : * skipData field shows whether the clause was used.
128 : *
129 : * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
130 : * the new heap, it's better to create the indexes afterwards than to fill them
131 : * incrementally while we load.
132 : *
133 : * The matview's "populated" state is changed based on whether the contents
134 : * reflect the result set of the materialized view's query.
135 : */
136 : ObjectAddress
137 21 : ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
138 : ParamListInfo params, char *completionTag)
139 : {
140 : Oid matviewOid;
141 : Relation matviewRel;
142 : RewriteRule *rule;
143 : List *actions;
144 : Query *dataQuery;
145 : Oid tableSpace;
146 : Oid relowner;
147 : Oid OIDNewHeap;
148 : DestReceiver *dest;
149 21 : uint64 processed = 0;
150 : bool concurrent;
151 : LOCKMODE lockmode;
152 : char relpersistence;
153 : Oid save_userid;
154 : int save_sec_context;
155 : int save_nestlevel;
156 : ObjectAddress address;
157 :
158 : /* Determine strength of lock needed. */
159 21 : concurrent = stmt->concurrent;
160 21 : lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
161 :
162 : /*
163 : * Get a lock until end of transaction.
164 : */
165 21 : matviewOid = RangeVarGetRelidExtended(stmt->relation,
166 : lockmode, false, false,
167 : RangeVarCallbackOwnsTable, NULL);
168 21 : matviewRel = heap_open(matviewOid, NoLock);
169 :
170 : /* Make sure it is a materialized view. */
171 21 : if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
172 0 : ereport(ERROR,
173 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
174 : errmsg("\"%s\" is not a materialized view",
175 : RelationGetRelationName(matviewRel))));
176 :
177 : /* Check that CONCURRENTLY is not specified if not populated. */
178 21 : if (concurrent && !RelationIsPopulated(matviewRel))
179 0 : ereport(ERROR,
180 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
181 : errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
182 :
183 : /* Check that conflicting options have not been specified. */
184 21 : if (concurrent && stmt->skipData)
185 1 : ereport(ERROR,
186 : (errcode(ERRCODE_SYNTAX_ERROR),
187 : errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
188 :
189 : /* We don't allow an oid column for a materialized view. */
190 20 : Assert(!matviewRel->rd_rel->relhasoids);
191 :
192 : /*
193 : * Check that everything is correct for a refresh. Problems at this point
194 : * are internal errors, so elog is sufficient.
195 : */
196 40 : if (matviewRel->rd_rel->relhasrules == false ||
197 20 : matviewRel->rd_rules->numLocks < 1)
198 0 : elog(ERROR,
199 : "materialized view \"%s\" is missing rewrite information",
200 : RelationGetRelationName(matviewRel));
201 :
202 20 : if (matviewRel->rd_rules->numLocks > 1)
203 0 : elog(ERROR,
204 : "materialized view \"%s\" has too many rules",
205 : RelationGetRelationName(matviewRel));
206 :
207 20 : rule = matviewRel->rd_rules->rules[0];
208 20 : if (rule->event != CMD_SELECT || !(rule->isInstead))
209 0 : elog(ERROR,
210 : "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
211 : RelationGetRelationName(matviewRel));
212 :
213 20 : actions = rule->actions;
214 20 : if (list_length(actions) != 1)
215 0 : elog(ERROR,
216 : "the rule for materialized view \"%s\" is not a single action",
217 : RelationGetRelationName(matviewRel));
218 :
219 : /*
220 : * Check that there is a unique index with no WHERE clause on one or more
221 : * columns of the materialized view if CONCURRENTLY is specified.
222 : */
223 20 : if (concurrent)
224 : {
225 7 : List *indexoidlist = RelationGetIndexList(matviewRel);
226 : ListCell *indexoidscan;
227 7 : bool hasUniqueIndex = false;
228 :
229 9 : foreach(indexoidscan, indexoidlist)
230 : {
231 8 : Oid indexoid = lfirst_oid(indexoidscan);
232 : Relation indexRel;
233 : Form_pg_index indexStruct;
234 :
235 8 : indexRel = index_open(indexoid, AccessShareLock);
236 8 : indexStruct = indexRel->rd_index;
237 :
238 16 : if (indexStruct->indisunique &&
239 16 : IndexIsValid(indexStruct) &&
240 15 : RelationGetIndexExpressions(indexRel) == NIL &&
241 13 : RelationGetIndexPredicate(indexRel) == NIL &&
242 6 : indexStruct->indnatts > 0)
243 : {
244 6 : hasUniqueIndex = true;
245 6 : index_close(indexRel, AccessShareLock);
246 6 : break;
247 : }
248 :
249 2 : index_close(indexRel, AccessShareLock);
250 : }
251 :
252 7 : list_free(indexoidlist);
253 :
254 7 : if (!hasUniqueIndex)
255 1 : ereport(ERROR,
256 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
257 : errmsg("cannot refresh materialized view \"%s\" concurrently",
258 : quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
259 : RelationGetRelationName(matviewRel))),
260 : errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
261 : }
262 :
263 : /*
264 : * The stored query was rewritten at the time of the MV definition, but
265 : * has not been scribbled on by the planner.
266 : */
267 19 : dataQuery = linitial_node(Query, actions);
268 :
269 : /*
270 : * Check for active uses of the relation in the current transaction, such
271 : * as open scans.
272 : *
273 : * NB: We count on this to protect us against problems with refreshing the
274 : * data using HEAP_INSERT_FROZEN.
275 : */
276 19 : CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
277 :
278 : /*
279 : * Tentatively mark the matview as populated or not (this will roll back
280 : * if we fail later).
281 : */
282 19 : SetMatViewPopulatedState(matviewRel, !stmt->skipData);
283 :
284 19 : relowner = matviewRel->rd_rel->relowner;
285 :
286 : /*
287 : * Switch to the owner's userid, so that any functions are run as that
288 : * user. Also arrange to make GUC variable changes local to this command.
289 : * Don't lock it down too tight to create a temporary table just yet. We
290 : * will switch modes when we are about to execute user code.
291 : */
292 19 : GetUserIdAndSecContext(&save_userid, &save_sec_context);
293 19 : SetUserIdAndSecContext(relowner,
294 : save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
295 19 : save_nestlevel = NewGUCNestLevel();
296 :
297 : /* Concurrent refresh builds new data in temp tablespace, and does diff. */
298 19 : if (concurrent)
299 : {
300 6 : tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP);
301 6 : relpersistence = RELPERSISTENCE_TEMP;
302 : }
303 : else
304 : {
305 13 : tableSpace = matviewRel->rd_rel->reltablespace;
306 13 : relpersistence = matviewRel->rd_rel->relpersistence;
307 : }
308 :
309 : /*
310 : * Create the transient table that will receive the regenerated data. Lock
311 : * it against access by any other process until commit (by which time it
312 : * will be gone).
313 : */
314 19 : OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
315 : ExclusiveLock);
316 19 : LockRelationOid(OIDNewHeap, AccessExclusiveLock);
317 19 : dest = CreateTransientRelDestReceiver(OIDNewHeap);
318 :
319 : /*
320 : * Now lock down security-restricted operations.
321 : */
322 19 : SetUserIdAndSecContext(relowner,
323 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
324 :
325 : /* Generate the data, if wanted. */
326 19 : if (!stmt->skipData)
327 19 : processed = refresh_matview_datafill(dest, dataQuery, queryString);
328 :
329 : /* Make the matview match the newly generated data. */
330 18 : if (concurrent)
331 : {
332 6 : int old_depth = matview_maintenance_depth;
333 :
334 6 : PG_TRY();
335 : {
336 6 : refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
337 : save_sec_context);
338 : }
339 1 : PG_CATCH();
340 : {
341 1 : matview_maintenance_depth = old_depth;
342 1 : PG_RE_THROW();
343 : }
344 5 : PG_END_TRY();
345 5 : Assert(matview_maintenance_depth == old_depth);
346 : }
347 : else
348 : {
349 12 : refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
350 :
351 : /*
352 : * Inform stats collector about our activity: basically, we truncated
353 : * the matview and inserted some new data. (The concurrent code path
354 : * above doesn't need to worry about this because the inserts and
355 : * deletes it issues get counted by lower-level code.)
356 : */
357 11 : pgstat_count_truncate(matviewRel);
358 11 : if (!stmt->skipData)
359 11 : pgstat_count_heap_insert(matviewRel, processed);
360 : }
361 :
362 16 : heap_close(matviewRel, NoLock);
363 :
364 : /* Roll back any GUC changes */
365 16 : AtEOXact_GUC(false, save_nestlevel);
366 :
367 : /* Restore userid and security context */
368 16 : SetUserIdAndSecContext(save_userid, save_sec_context);
369 :
370 16 : ObjectAddressSet(address, RelationRelationId, matviewOid);
371 :
372 16 : return address;
373 : }
374 :
375 : /*
376 : * refresh_matview_datafill
377 : *
378 : * Execute the given query, sending result rows to "dest" (which will
379 : * insert them into the target matview).
380 : *
381 : * Returns number of rows inserted.
382 : */
383 : static uint64
384 19 : refresh_matview_datafill(DestReceiver *dest, Query *query,
385 : const char *queryString)
386 : {
387 : List *rewritten;
388 : PlannedStmt *plan;
389 : QueryDesc *queryDesc;
390 : Query *copied_query;
391 : uint64 processed;
392 :
393 : /* Lock and rewrite, using a copy to preserve the original query. */
394 19 : copied_query = copyObject(query);
395 19 : AcquireRewriteLocks(copied_query, true, false);
396 19 : rewritten = QueryRewrite(copied_query);
397 :
398 : /* SELECT should never rewrite to more or less than one SELECT query */
399 19 : if (list_length(rewritten) != 1)
400 0 : elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
401 19 : query = (Query *) linitial(rewritten);
402 :
403 : /* Check for user-requested abort. */
404 19 : CHECK_FOR_INTERRUPTS();
405 :
406 : /* Plan the query which will generate data for the refresh. */
407 19 : plan = pg_plan_query(query, 0, NULL);
408 :
409 : /*
410 : * Use a snapshot with an updated command ID to ensure this query sees
411 : * results of any previously executed queries. (This could only matter if
412 : * the planner executed an allegedly-stable function that changed the
413 : * database contents, but let's do it anyway to be safe.)
414 : */
415 18 : PushCopiedSnapshot(GetActiveSnapshot());
416 18 : UpdateActiveSnapshotCommandId();
417 :
418 : /* Create a QueryDesc, redirecting output to our tuple receiver */
419 18 : queryDesc = CreateQueryDesc(plan, queryString,
420 : GetActiveSnapshot(), InvalidSnapshot,
421 : dest, NULL, NULL, 0);
422 :
423 : /* call ExecutorStart to prepare the plan for execution */
424 18 : ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
425 :
426 : /* run the plan */
427 18 : ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
428 :
429 18 : processed = queryDesc->estate->es_processed;
430 :
431 : /* and clean up */
432 18 : ExecutorFinish(queryDesc);
433 18 : ExecutorEnd(queryDesc);
434 :
435 18 : FreeQueryDesc(queryDesc);
436 :
437 18 : PopActiveSnapshot();
438 :
439 18 : return processed;
440 : }
441 :
442 : DestReceiver *
443 19 : CreateTransientRelDestReceiver(Oid transientoid)
444 : {
445 19 : DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
446 :
447 19 : self->pub.receiveSlot = transientrel_receive;
448 19 : self->pub.rStartup = transientrel_startup;
449 19 : self->pub.rShutdown = transientrel_shutdown;
450 19 : self->pub.rDestroy = transientrel_destroy;
451 19 : self->pub.mydest = DestTransientRel;
452 19 : self->transientoid = transientoid;
453 :
454 19 : return (DestReceiver *) self;
455 : }
456 :
457 : /*
458 : * transientrel_startup --- executor startup
459 : */
460 : static void
461 18 : transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
462 : {
463 18 : DR_transientrel *myState = (DR_transientrel *) self;
464 : Relation transientrel;
465 :
466 18 : transientrel = heap_open(myState->transientoid, NoLock);
467 :
468 : /*
469 : * Fill private fields of myState for use by later routines
470 : */
471 18 : myState->transientrel = transientrel;
472 18 : myState->output_cid = GetCurrentCommandId(true);
473 :
474 : /*
475 : * We can skip WAL-logging the insertions, unless PITR or streaming
476 : * replication is in use. We can skip the FSM in any case.
477 : */
478 18 : myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
479 18 : if (!XLogIsNeeded())
480 0 : myState->hi_options |= HEAP_INSERT_SKIP_WAL;
481 18 : myState->bistate = GetBulkInsertState();
482 :
483 : /* Not using WAL requires smgr_targblock be initially invalid */
484 18 : Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
485 18 : }
486 :
487 : /*
488 : * transientrel_receive --- receive one tuple
489 : */
490 : static bool
491 50 : transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
492 : {
493 50 : DR_transientrel *myState = (DR_transientrel *) self;
494 : HeapTuple tuple;
495 :
496 : /*
497 : * get the heap tuple out of the tuple table slot, making sure we have a
498 : * writable copy
499 : */
500 50 : tuple = ExecMaterializeSlot(slot);
501 :
502 50 : heap_insert(myState->transientrel,
503 : tuple,
504 : myState->output_cid,
505 : myState->hi_options,
506 : myState->bistate);
507 :
508 : /* We know this is a newly created relation, so there are no indexes */
509 :
510 50 : return true;
511 : }
512 :
513 : /*
514 : * transientrel_shutdown --- executor end
515 : */
516 : static void
517 18 : transientrel_shutdown(DestReceiver *self)
518 : {
519 18 : DR_transientrel *myState = (DR_transientrel *) self;
520 :
521 18 : FreeBulkInsertState(myState->bistate);
522 :
523 : /* If we skipped using WAL, must heap_sync before commit */
524 18 : if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
525 0 : heap_sync(myState->transientrel);
526 :
527 : /* close transientrel, but keep lock until commit */
528 18 : heap_close(myState->transientrel, NoLock);
529 18 : myState->transientrel = NULL;
530 18 : }
531 :
532 : /*
533 : * transientrel_destroy --- release DestReceiver object
534 : */
535 : static void
536 0 : transientrel_destroy(DestReceiver *self)
537 : {
538 0 : pfree(self);
539 0 : }
540 :
541 :
542 : /*
543 : * Given a qualified temporary table name, append an underscore followed by
544 : * the given integer, to make a new table name based on the old one.
545 : *
546 : * This leaks memory through palloc(), which won't be cleaned up until the
547 : * current memory context is freed.
548 : */
549 : static char *
550 6 : make_temptable_name_n(char *tempname, int n)
551 : {
552 : StringInfoData namebuf;
553 :
554 6 : initStringInfo(&namebuf);
555 6 : appendStringInfoString(&namebuf, tempname);
556 6 : appendStringInfo(&namebuf, "_%d", n);
557 6 : return namebuf.data;
558 : }
559 :
560 : static void
561 7 : mv_GenerateOper(StringInfo buf, Oid opoid)
562 : {
563 : HeapTuple opertup;
564 : Form_pg_operator operform;
565 :
566 7 : opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid));
567 7 : if (!HeapTupleIsValid(opertup))
568 0 : elog(ERROR, "cache lookup failed for operator %u", opoid);
569 7 : operform = (Form_pg_operator) GETSTRUCT(opertup);
570 7 : Assert(operform->oprkind == 'b');
571 :
572 7 : appendStringInfo(buf, "OPERATOR(%s.%s)",
573 7 : quote_identifier(get_namespace_name(operform->oprnamespace)),
574 7 : NameStr(operform->oprname));
575 :
576 7 : ReleaseSysCache(opertup);
577 7 : }
578 :
579 : /*
580 : * refresh_by_match_merge
581 : *
582 : * Refresh a materialized view with transactional semantics, while allowing
583 : * concurrent reads.
584 : *
585 : * This is called after a new version of the data has been created in a
586 : * temporary table. It performs a full outer join against the old version of
587 : * the data, producing "diff" results. This join cannot work if there are any
588 : * duplicated rows in either the old or new versions, in the sense that every
589 : * column would compare as equal between the two rows. It does work correctly
590 : * in the face of rows which have at least one NULL value, with all non-NULL
591 : * columns equal. The behavior of NULLs on equality tests and on UNIQUE
592 : * indexes turns out to be quite convenient here; the tests we need to make
593 : * are consistent with default behavior. If there is at least one UNIQUE
594 : * index on the materialized view, we have exactly the guarantee we need.
595 : *
596 : * The temporary table used to hold the diff results contains just the TID of
597 : * the old record (if matched) and the ROW from the new table as a single
598 : * column of complex record type (if matched).
599 : *
600 : * Once we have the diff table, we perform set-based DELETE and INSERT
601 : * operations against the materialized view, and discard both temporary
602 : * tables.
603 : *
604 : * Everything from the generation of the new data to applying the differences
605 : * takes place under cover of an ExclusiveLock, since it seems as though we
606 : * would want to prohibit not only concurrent REFRESH operations, but also
607 : * incremental maintenance. It also doesn't seem reasonable or safe to allow
608 : * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
609 : * this command.
610 : */
611 : static void
612 6 : refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
613 : int save_sec_context)
614 : {
615 : StringInfoData querybuf;
616 : Relation matviewRel;
617 : Relation tempRel;
618 : char *matviewname;
619 : char *tempname;
620 : char *diffname;
621 : TupleDesc tupdesc;
622 : bool foundUniqueIndex;
623 : List *indexoidlist;
624 : ListCell *indexoidscan;
625 : int16 relnatts;
626 : bool *usedForQual;
627 :
628 6 : initStringInfo(&querybuf);
629 6 : matviewRel = heap_open(matviewOid, NoLock);
630 6 : matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
631 6 : RelationGetRelationName(matviewRel));
632 6 : tempRel = heap_open(tempOid, NoLock);
633 6 : tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
634 6 : RelationGetRelationName(tempRel));
635 6 : diffname = make_temptable_name_n(tempname, 2);
636 :
637 6 : relnatts = matviewRel->rd_rel->relnatts;
638 6 : usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
639 :
640 : /* Open SPI context. */
641 6 : if (SPI_connect() != SPI_OK_CONNECT)
642 0 : elog(ERROR, "SPI_connect failed");
643 :
644 : /* Analyze the temp table with the new contents. */
645 6 : appendStringInfo(&querybuf, "ANALYZE %s", tempname);
646 6 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
647 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
648 :
649 : /*
650 : * We need to ensure that there are not duplicate rows without NULLs in
651 : * the new data set before we can count on the "diff" results. Check for
652 : * that in a way that allows showing the first duplicated row found. Even
653 : * after we pass this test, a unique index on the materialized view may
654 : * find a duplicate key problem.
655 : */
656 6 : resetStringInfo(&querybuf);
657 6 : appendStringInfo(&querybuf,
658 : "SELECT newdata FROM %s newdata "
659 : "WHERE newdata IS NOT NULL AND EXISTS "
660 : "(SELECT * FROM %s newdata2 WHERE newdata2 IS NOT NULL "
661 : "AND newdata2 OPERATOR(pg_catalog.*=) newdata "
662 : "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
663 : "newdata.ctid) LIMIT 1",
664 : tempname, tempname);
665 6 : if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
666 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
667 6 : if (SPI_processed > 0)
668 : {
669 : /*
670 : * Note that this ereport() is returning data to the user. Generally,
671 : * we would want to make sure that the user has been granted access to
672 : * this data. However, REFRESH MAT VIEW is only able to be run by the
673 : * owner of the mat view (or a superuser) and therefore there is no
674 : * need to check for access to data in the mat view.
675 : */
676 1 : ereport(ERROR,
677 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
678 : errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
679 : RelationGetRelationName(matviewRel)),
680 : errdetail("Row: %s",
681 : SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
682 : }
683 :
684 5 : SetUserIdAndSecContext(relowner,
685 : save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
686 :
687 : /* Start building the query for creating the diff table. */
688 5 : resetStringInfo(&querybuf);
689 5 : appendStringInfo(&querybuf,
690 : "CREATE TEMP TABLE %s AS "
691 : "SELECT mv.ctid AS tid, newdata "
692 : "FROM %s mv FULL JOIN %s newdata ON (",
693 : diffname, matviewname, tempname);
694 :
695 : /*
696 : * Get the list of index OIDs for the table from the relcache, and look up
697 : * each one in the pg_index syscache. We will test for equality on all
698 : * columns present in all unique indexes which only reference columns and
699 : * include all rows.
700 : */
701 5 : tupdesc = matviewRel->rd_att;
702 5 : foundUniqueIndex = false;
703 5 : indexoidlist = RelationGetIndexList(matviewRel);
704 :
705 12 : foreach(indexoidscan, indexoidlist)
706 : {
707 7 : Oid indexoid = lfirst_oid(indexoidscan);
708 : Relation indexRel;
709 : Form_pg_index indexStruct;
710 :
711 7 : indexRel = index_open(indexoid, RowExclusiveLock);
712 7 : indexStruct = indexRel->rd_index;
713 :
714 : /*
715 : * We're only interested if it is unique, valid, contains no
716 : * expressions, and is not partial.
717 : */
718 14 : if (indexStruct->indisunique &&
719 14 : IndexIsValid(indexStruct) &&
720 14 : RelationGetIndexExpressions(indexRel) == NIL &&
721 7 : RelationGetIndexPredicate(indexRel) == NIL)
722 : {
723 7 : int numatts = indexStruct->indnatts;
724 : int i;
725 :
726 : /* Add quals for all columns from this index. */
727 14 : for (i = 0; i < numatts; i++)
728 : {
729 7 : int attnum = indexStruct->indkey.values[i];
730 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
731 : Oid type;
732 : Oid op;
733 : const char *colname;
734 :
735 : /*
736 : * Only include the column once regardless of how many times
737 : * it shows up in how many indexes.
738 : */
739 7 : if (usedForQual[attnum - 1])
740 0 : continue;
741 7 : usedForQual[attnum - 1] = true;
742 :
743 : /*
744 : * Actually add the qual, ANDed with any others.
745 : */
746 7 : if (foundUniqueIndex)
747 2 : appendStringInfoString(&querybuf, " AND ");
748 :
749 7 : colname = quote_identifier(NameStr(attr->attname));
750 7 : appendStringInfo(&querybuf, "newdata.%s ", colname);
751 7 : type = attnumTypeId(matviewRel, attnum);
752 7 : op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr;
753 7 : mv_GenerateOper(&querybuf, op);
754 7 : appendStringInfo(&querybuf, " mv.%s", colname);
755 :
756 7 : foundUniqueIndex = true;
757 : }
758 : }
759 :
760 : /* Keep the locks, since we're about to run DML which needs them. */
761 7 : index_close(indexRel, NoLock);
762 : }
763 :
764 5 : list_free(indexoidlist);
765 :
766 : /*
767 : * There must be at least one unique index on the matview.
768 : *
769 : * ExecRefreshMatView() checks that after taking the exclusive lock on the
770 : * matview. So at least one unique index is guaranteed to exist here
771 : * because the lock is still being held.
772 : */
773 5 : Assert(foundUniqueIndex);
774 :
775 5 : appendStringInfoString(&querybuf,
776 : " AND newdata OPERATOR(pg_catalog.*=) mv) "
777 : "WHERE newdata IS NULL OR mv IS NULL "
778 : "ORDER BY tid");
779 :
780 : /* Create the temporary "diff" table. */
781 5 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
782 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
783 :
784 5 : SetUserIdAndSecContext(relowner,
785 : save_sec_context | SECURITY_RESTRICTED_OPERATION);
786 :
787 : /*
788 : * We have no further use for data from the "full-data" temp table, but we
789 : * must keep it around because its type is referenced from the diff table.
790 : */
791 :
792 : /* Analyze the diff table. */
793 5 : resetStringInfo(&querybuf);
794 5 : appendStringInfo(&querybuf, "ANALYZE %s", diffname);
795 5 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
796 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
797 :
798 5 : OpenMatViewIncrementalMaintenance();
799 :
800 : /* Deletes must come before inserts; do them first. */
801 5 : resetStringInfo(&querybuf);
802 5 : appendStringInfo(&querybuf,
803 : "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
804 : "(SELECT diff.tid FROM %s diff "
805 : "WHERE diff.tid IS NOT NULL "
806 : "AND diff.newdata IS NULL)",
807 : matviewname, diffname);
808 5 : if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
809 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
810 :
811 : /* Inserts go last. */
812 5 : resetStringInfo(&querybuf);
813 5 : appendStringInfo(&querybuf,
814 : "INSERT INTO %s SELECT (diff.newdata).* "
815 : "FROM %s diff WHERE tid IS NULL",
816 : matviewname, diffname);
817 5 : if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
818 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
819 :
820 : /* We're done maintaining the materialized view. */
821 5 : CloseMatViewIncrementalMaintenance();
822 5 : heap_close(tempRel, NoLock);
823 5 : heap_close(matviewRel, NoLock);
824 :
825 : /* Clean up temp tables. */
826 5 : resetStringInfo(&querybuf);
827 5 : appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
828 5 : if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
829 0 : elog(ERROR, "SPI_exec failed: %s", querybuf.data);
830 :
831 : /* Close SPI context. */
832 5 : if (SPI_finish() != SPI_OK_FINISH)
833 0 : elog(ERROR, "SPI_finish failed");
834 5 : }
835 :
836 : /*
837 : * Swap the physical files of the target and transient tables, then rebuild
838 : * the target's indexes and throw away the transient table. Security context
839 : * swapping is handled by the called function, so it is not needed here.
840 : */
841 : static void
842 12 : refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
843 : {
844 12 : finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
845 : RecentXmin, ReadNextMultiXactId(), relpersistence);
846 11 : }
847 :
848 :
849 : /*
850 : * This should be used to test whether the backend is in a context where it is
851 : * OK to allow DML statements to modify materialized views. We only want to
852 : * allow that for internal code driven by the materialized view definition,
853 : * not for arbitrary user-supplied code.
854 : *
855 : * While the function names reflect the fact that their main intended use is
856 : * incremental maintenance of materialized views (in response to changes to
857 : * the data in referenced relations), they are initially used to allow REFRESH
858 : * without blocking concurrent reads.
859 : */
860 : bool
861 10 : MatViewIncrementalMaintenanceIsEnabled(void)
862 : {
863 10 : return matview_maintenance_depth > 0;
864 : }
865 :
866 : static void
867 5 : OpenMatViewIncrementalMaintenance(void)
868 : {
869 5 : matview_maintenance_depth++;
870 5 : }
871 :
872 : static void
873 5 : CloseMatViewIncrementalMaintenance(void)
874 : {
875 5 : matview_maintenance_depth--;
876 5 : Assert(matview_maintenance_depth >= 0);
877 5 : }
|