LCOV - code coverage report
Current view: top level - src/backend/commands - matview.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 233 257 90.7 %
Date: 2017-09-29 15:12:54 Functions: 14 15 93.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * matview.c
       4             :  *    materialized view support
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/commands/matview.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/htup_details.h"
      18             : #include "access/multixact.h"
      19             : #include "access/xact.h"
      20             : #include "access/xlog.h"
      21             : #include "catalog/catalog.h"
      22             : #include "catalog/indexing.h"
      23             : #include "catalog/namespace.h"
      24             : #include "catalog/pg_operator.h"
      25             : #include "commands/cluster.h"
      26             : #include "commands/matview.h"
      27             : #include "commands/tablecmds.h"
      28             : #include "commands/tablespace.h"
      29             : #include "executor/executor.h"
      30             : #include "executor/spi.h"
      31             : #include "miscadmin.h"
      32             : #include "parser/parse_relation.h"
      33             : #include "pgstat.h"
      34             : #include "rewrite/rewriteHandler.h"
      35             : #include "storage/lmgr.h"
      36             : #include "storage/smgr.h"
      37             : #include "tcop/tcopprot.h"
      38             : #include "utils/builtins.h"
      39             : #include "utils/lsyscache.h"
      40             : #include "utils/rel.h"
      41             : #include "utils/snapmgr.h"
      42             : #include "utils/syscache.h"
      43             : #include "utils/typcache.h"
      44             : 
      45             : 
      46             : typedef struct
      47             : {
      48             :     DestReceiver pub;           /* publicly-known function pointers */
      49             :     Oid         transientoid;   /* OID of new heap into which to store */
      50             :     /* These fields are filled by transientrel_startup: */
      51             :     Relation    transientrel;   /* relation to write to */
      52             :     CommandId   output_cid;     /* cmin to insert in output tuples */
      53             :     int         hi_options;     /* heap_insert performance options */
      54             :     BulkInsertState bistate;    /* bulk insert state */
      55             : } DR_transientrel;
      56             : 
      57             : static int  matview_maintenance_depth = 0;
      58             : 
      59             : static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
      60             : static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
      61             : static void transientrel_shutdown(DestReceiver *self);
      62             : static void transientrel_destroy(DestReceiver *self);
      63             : static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
      64             :                          const char *queryString);
      65             : 
      66             : static char *make_temptable_name_n(char *tempname, int n);
      67             : static void mv_GenerateOper(StringInfo buf, Oid opoid);
      68             : 
      69             : static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
      70             :                        int save_sec_context);
      71             : static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
      72             : 
      73             : static void OpenMatViewIncrementalMaintenance(void);
      74             : static void CloseMatViewIncrementalMaintenance(void);
      75             : 
      76             : /*
      77             :  * SetMatViewPopulatedState
      78             :  *      Mark a materialized view as populated, or not.
      79             :  *
      80             :  * NOTE: caller must be holding an appropriate lock on the relation.
      81             :  */
      82             : void
      83          38 : SetMatViewPopulatedState(Relation relation, bool newstate)
      84             : {
      85             :     Relation    pgrel;
      86             :     HeapTuple   tuple;
      87             : 
      88          38 :     Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
      89             : 
      90             :     /*
      91             :      * Update relation's pg_class entry.  Crucial side-effect: other backends
      92             :      * (and this one too!) are sent SI message to make them rebuild relcache
      93             :      * entries.
      94             :      */
      95          38 :     pgrel = heap_open(RelationRelationId, RowExclusiveLock);
      96          38 :     tuple = SearchSysCacheCopy1(RELOID,
      97             :                                 ObjectIdGetDatum(RelationGetRelid(relation)));
      98          38 :     if (!HeapTupleIsValid(tuple))
      99           0 :         elog(ERROR, "cache lookup failed for relation %u",
     100             :              RelationGetRelid(relation));
     101             : 
     102          38 :     ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
     103             : 
     104          38 :     CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
     105             : 
     106          38 :     heap_freetuple(tuple);
     107          38 :     heap_close(pgrel, RowExclusiveLock);
     108             : 
     109             :     /*
     110             :      * Advance command counter to make the updated pg_class row locally
     111             :      * visible.
     112             :      */
     113          38 :     CommandCounterIncrement();
     114          38 : }
     115             : 
     116             : /*
     117             :  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
     118             :  *
     119             :  * This refreshes the materialized view by creating a new table and swapping
     120             :  * the relfilenodes of the new table and the old materialized view, so the OID
     121             :  * of the original materialized view is preserved. Thus we do not lose GRANT
     122             :  * nor references to this materialized view.
     123             :  *
     124             :  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
     125             :  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
     126             :  * statement associated with the materialized view.  The statement node's
     127             :  * skipData field shows whether the clause was used.
     128             :  *
     129             :  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
     130             :  * the new heap, it's better to create the indexes afterwards than to fill them
     131             :  * incrementally while we load.
     132             :  *
     133             :  * The matview's "populated" state is changed based on whether the contents
     134             :  * reflect the result set of the materialized view's query.
     135             :  */
     136             : ObjectAddress
     137          21 : ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
     138             :                    ParamListInfo params, char *completionTag)
     139             : {
     140             :     Oid         matviewOid;
     141             :     Relation    matviewRel;
     142             :     RewriteRule *rule;
     143             :     List       *actions;
     144             :     Query      *dataQuery;
     145             :     Oid         tableSpace;
     146             :     Oid         relowner;
     147             :     Oid         OIDNewHeap;
     148             :     DestReceiver *dest;
     149          21 :     uint64      processed = 0;
     150             :     bool        concurrent;
     151             :     LOCKMODE    lockmode;
     152             :     char        relpersistence;
     153             :     Oid         save_userid;
     154             :     int         save_sec_context;
     155             :     int         save_nestlevel;
     156             :     ObjectAddress address;
     157             : 
     158             :     /* Determine strength of lock needed. */
     159          21 :     concurrent = stmt->concurrent;
     160          21 :     lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
     161             : 
     162             :     /*
     163             :      * Get a lock until end of transaction.
     164             :      */
     165          21 :     matviewOid = RangeVarGetRelidExtended(stmt->relation,
     166             :                                           lockmode, false, false,
     167             :                                           RangeVarCallbackOwnsTable, NULL);
     168          21 :     matviewRel = heap_open(matviewOid, NoLock);
     169             : 
     170             :     /* Make sure it is a materialized view. */
     171          21 :     if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
     172           0 :         ereport(ERROR,
     173             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     174             :                  errmsg("\"%s\" is not a materialized view",
     175             :                         RelationGetRelationName(matviewRel))));
     176             : 
     177             :     /* Check that CONCURRENTLY is not specified if not populated. */
     178          21 :     if (concurrent && !RelationIsPopulated(matviewRel))
     179           0 :         ereport(ERROR,
     180             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     181             :                  errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
     182             : 
     183             :     /* Check that conflicting options have not been specified. */
     184          21 :     if (concurrent && stmt->skipData)
     185           1 :         ereport(ERROR,
     186             :                 (errcode(ERRCODE_SYNTAX_ERROR),
     187             :                  errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
     188             : 
     189             :     /* We don't allow an oid column for a materialized view. */
     190          20 :     Assert(!matviewRel->rd_rel->relhasoids);
     191             : 
     192             :     /*
     193             :      * Check that everything is correct for a refresh. Problems at this point
     194             :      * are internal errors, so elog is sufficient.
     195             :      */
     196          40 :     if (matviewRel->rd_rel->relhasrules == false ||
     197          20 :         matviewRel->rd_rules->numLocks < 1)
     198           0 :         elog(ERROR,
     199             :              "materialized view \"%s\" is missing rewrite information",
     200             :              RelationGetRelationName(matviewRel));
     201             : 
     202          20 :     if (matviewRel->rd_rules->numLocks > 1)
     203           0 :         elog(ERROR,
     204             :              "materialized view \"%s\" has too many rules",
     205             :              RelationGetRelationName(matviewRel));
     206             : 
     207          20 :     rule = matviewRel->rd_rules->rules[0];
     208          20 :     if (rule->event != CMD_SELECT || !(rule->isInstead))
     209           0 :         elog(ERROR,
     210             :              "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
     211             :              RelationGetRelationName(matviewRel));
     212             : 
     213          20 :     actions = rule->actions;
     214          20 :     if (list_length(actions) != 1)
     215           0 :         elog(ERROR,
     216             :              "the rule for materialized view \"%s\" is not a single action",
     217             :              RelationGetRelationName(matviewRel));
     218             : 
     219             :     /*
     220             :      * Check that there is a unique index with no WHERE clause on one or more
     221             :      * columns of the materialized view if CONCURRENTLY is specified.
     222             :      */
     223          20 :     if (concurrent)
     224             :     {
     225           7 :         List       *indexoidlist = RelationGetIndexList(matviewRel);
     226             :         ListCell   *indexoidscan;
     227           7 :         bool        hasUniqueIndex = false;
     228             : 
     229           9 :         foreach(indexoidscan, indexoidlist)
     230             :         {
     231           8 :             Oid         indexoid = lfirst_oid(indexoidscan);
     232             :             Relation    indexRel;
     233             :             Form_pg_index indexStruct;
     234             : 
     235           8 :             indexRel = index_open(indexoid, AccessShareLock);
     236           8 :             indexStruct = indexRel->rd_index;
     237             : 
     238          16 :             if (indexStruct->indisunique &&
     239          16 :                 IndexIsValid(indexStruct) &&
     240          15 :                 RelationGetIndexExpressions(indexRel) == NIL &&
     241          13 :                 RelationGetIndexPredicate(indexRel) == NIL &&
     242           6 :                 indexStruct->indnatts > 0)
     243             :             {
     244           6 :                 hasUniqueIndex = true;
     245           6 :                 index_close(indexRel, AccessShareLock);
     246           6 :                 break;
     247             :             }
     248             : 
     249           2 :             index_close(indexRel, AccessShareLock);
     250             :         }
     251             : 
     252           7 :         list_free(indexoidlist);
     253             : 
     254           7 :         if (!hasUniqueIndex)
     255           1 :             ereport(ERROR,
     256             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     257             :                      errmsg("cannot refresh materialized view \"%s\" concurrently",
     258             :                             quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
     259             :                                                        RelationGetRelationName(matviewRel))),
     260             :                      errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
     261             :     }
     262             : 
     263             :     /*
     264             :      * The stored query was rewritten at the time of the MV definition, but
     265             :      * has not been scribbled on by the planner.
     266             :      */
     267          19 :     dataQuery = linitial_node(Query, actions);
     268             : 
     269             :     /*
     270             :      * Check for active uses of the relation in the current transaction, such
     271             :      * as open scans.
     272             :      *
     273             :      * NB: We count on this to protect us against problems with refreshing the
     274             :      * data using HEAP_INSERT_FROZEN.
     275             :      */
     276          19 :     CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
     277             : 
     278             :     /*
     279             :      * Tentatively mark the matview as populated or not (this will roll back
     280             :      * if we fail later).
     281             :      */
     282          19 :     SetMatViewPopulatedState(matviewRel, !stmt->skipData);
     283             : 
     284          19 :     relowner = matviewRel->rd_rel->relowner;
     285             : 
     286             :     /*
     287             :      * Switch to the owner's userid, so that any functions are run as that
     288             :      * user.  Also arrange to make GUC variable changes local to this command.
     289             :      * Don't lock it down too tight to create a temporary table just yet.  We
     290             :      * will switch modes when we are about to execute user code.
     291             :      */
     292          19 :     GetUserIdAndSecContext(&save_userid, &save_sec_context);
     293          19 :     SetUserIdAndSecContext(relowner,
     294             :                            save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
     295          19 :     save_nestlevel = NewGUCNestLevel();
     296             : 
     297             :     /* Concurrent refresh builds new data in temp tablespace, and does diff. */
     298          19 :     if (concurrent)
     299             :     {
     300           6 :         tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP);
     301           6 :         relpersistence = RELPERSISTENCE_TEMP;
     302             :     }
     303             :     else
     304             :     {
     305          13 :         tableSpace = matviewRel->rd_rel->reltablespace;
     306          13 :         relpersistence = matviewRel->rd_rel->relpersistence;
     307             :     }
     308             : 
     309             :     /*
     310             :      * Create the transient table that will receive the regenerated data. Lock
     311             :      * it against access by any other process until commit (by which time it
     312             :      * will be gone).
     313             :      */
     314          19 :     OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
     315             :                                ExclusiveLock);
     316          19 :     LockRelationOid(OIDNewHeap, AccessExclusiveLock);
     317          19 :     dest = CreateTransientRelDestReceiver(OIDNewHeap);
     318             : 
     319             :     /*
     320             :      * Now lock down security-restricted operations.
     321             :      */
     322          19 :     SetUserIdAndSecContext(relowner,
     323             :                            save_sec_context | SECURITY_RESTRICTED_OPERATION);
     324             : 
     325             :     /* Generate the data, if wanted. */
     326          19 :     if (!stmt->skipData)
     327          19 :         processed = refresh_matview_datafill(dest, dataQuery, queryString);
     328             : 
     329             :     /* Make the matview match the newly generated data. */
     330          18 :     if (concurrent)
     331             :     {
     332           6 :         int         old_depth = matview_maintenance_depth;
     333             : 
     334           6 :         PG_TRY();
     335             :         {
     336           6 :             refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
     337             :                                    save_sec_context);
     338             :         }
     339           1 :         PG_CATCH();
     340             :         {
     341           1 :             matview_maintenance_depth = old_depth;
     342           1 :             PG_RE_THROW();
     343             :         }
     344           5 :         PG_END_TRY();
     345           5 :         Assert(matview_maintenance_depth == old_depth);
     346             :     }
     347             :     else
     348             :     {
     349          12 :         refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
     350             : 
     351             :         /*
     352             :          * Inform stats collector about our activity: basically, we truncated
     353             :          * the matview and inserted some new data.  (The concurrent code path
     354             :          * above doesn't need to worry about this because the inserts and
     355             :          * deletes it issues get counted by lower-level code.)
     356             :          */
     357          11 :         pgstat_count_truncate(matviewRel);
     358          11 :         if (!stmt->skipData)
     359          11 :             pgstat_count_heap_insert(matviewRel, processed);
     360             :     }
     361             : 
     362          16 :     heap_close(matviewRel, NoLock);
     363             : 
     364             :     /* Roll back any GUC changes */
     365          16 :     AtEOXact_GUC(false, save_nestlevel);
     366             : 
     367             :     /* Restore userid and security context */
     368          16 :     SetUserIdAndSecContext(save_userid, save_sec_context);
     369             : 
     370          16 :     ObjectAddressSet(address, RelationRelationId, matviewOid);
     371             : 
     372          16 :     return address;
     373             : }
     374             : 
     375             : /*
     376             :  * refresh_matview_datafill
     377             :  *
     378             :  * Execute the given query, sending result rows to "dest" (which will
     379             :  * insert them into the target matview).
     380             :  *
     381             :  * Returns number of rows inserted.
     382             :  */
     383             : static uint64
     384          19 : refresh_matview_datafill(DestReceiver *dest, Query *query,
     385             :                          const char *queryString)
     386             : {
     387             :     List       *rewritten;
     388             :     PlannedStmt *plan;
     389             :     QueryDesc  *queryDesc;
     390             :     Query      *copied_query;
     391             :     uint64      processed;
     392             : 
     393             :     /* Lock and rewrite, using a copy to preserve the original query. */
     394          19 :     copied_query = copyObject(query);
     395          19 :     AcquireRewriteLocks(copied_query, true, false);
     396          19 :     rewritten = QueryRewrite(copied_query);
     397             : 
     398             :     /* SELECT should never rewrite to more or less than one SELECT query */
     399          19 :     if (list_length(rewritten) != 1)
     400           0 :         elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
     401          19 :     query = (Query *) linitial(rewritten);
     402             : 
     403             :     /* Check for user-requested abort. */
     404          19 :     CHECK_FOR_INTERRUPTS();
     405             : 
     406             :     /* Plan the query which will generate data for the refresh. */
     407          19 :     plan = pg_plan_query(query, 0, NULL);
     408             : 
     409             :     /*
     410             :      * Use a snapshot with an updated command ID to ensure this query sees
     411             :      * results of any previously executed queries.  (This could only matter if
     412             :      * the planner executed an allegedly-stable function that changed the
     413             :      * database contents, but let's do it anyway to be safe.)
     414             :      */
     415          18 :     PushCopiedSnapshot(GetActiveSnapshot());
     416          18 :     UpdateActiveSnapshotCommandId();
     417             : 
     418             :     /* Create a QueryDesc, redirecting output to our tuple receiver */
     419          18 :     queryDesc = CreateQueryDesc(plan, queryString,
     420             :                                 GetActiveSnapshot(), InvalidSnapshot,
     421             :                                 dest, NULL, NULL, 0);
     422             : 
     423             :     /* call ExecutorStart to prepare the plan for execution */
     424          18 :     ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
     425             : 
     426             :     /* run the plan */
     427          18 :     ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
     428             : 
     429          18 :     processed = queryDesc->estate->es_processed;
     430             : 
     431             :     /* and clean up */
     432          18 :     ExecutorFinish(queryDesc);
     433          18 :     ExecutorEnd(queryDesc);
     434             : 
     435          18 :     FreeQueryDesc(queryDesc);
     436             : 
     437          18 :     PopActiveSnapshot();
     438             : 
     439          18 :     return processed;
     440             : }
     441             : 
     442             : DestReceiver *
     443          19 : CreateTransientRelDestReceiver(Oid transientoid)
     444             : {
     445          19 :     DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
     446             : 
     447          19 :     self->pub.receiveSlot = transientrel_receive;
     448          19 :     self->pub.rStartup = transientrel_startup;
     449          19 :     self->pub.rShutdown = transientrel_shutdown;
     450          19 :     self->pub.rDestroy = transientrel_destroy;
     451          19 :     self->pub.mydest = DestTransientRel;
     452          19 :     self->transientoid = transientoid;
     453             : 
     454          19 :     return (DestReceiver *) self;
     455             : }
     456             : 
     457             : /*
     458             :  * transientrel_startup --- executor startup
     459             :  */
     460             : static void
     461          18 : transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
     462             : {
     463          18 :     DR_transientrel *myState = (DR_transientrel *) self;
     464             :     Relation    transientrel;
     465             : 
     466          18 :     transientrel = heap_open(myState->transientoid, NoLock);
     467             : 
     468             :     /*
     469             :      * Fill private fields of myState for use by later routines
     470             :      */
     471          18 :     myState->transientrel = transientrel;
     472          18 :     myState->output_cid = GetCurrentCommandId(true);
     473             : 
     474             :     /*
     475             :      * We can skip WAL-logging the insertions, unless PITR or streaming
     476             :      * replication is in use. We can skip the FSM in any case.
     477             :      */
     478          18 :     myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
     479          18 :     if (!XLogIsNeeded())
     480           0 :         myState->hi_options |= HEAP_INSERT_SKIP_WAL;
     481          18 :     myState->bistate = GetBulkInsertState();
     482             : 
     483             :     /* Not using WAL requires smgr_targblock be initially invalid */
     484          18 :     Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
     485          18 : }
     486             : 
     487             : /*
     488             :  * transientrel_receive --- receive one tuple
     489             :  */
     490             : static bool
     491          50 : transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
     492             : {
     493          50 :     DR_transientrel *myState = (DR_transientrel *) self;
     494             :     HeapTuple   tuple;
     495             : 
     496             :     /*
     497             :      * get the heap tuple out of the tuple table slot, making sure we have a
     498             :      * writable copy
     499             :      */
     500          50 :     tuple = ExecMaterializeSlot(slot);
     501             : 
     502          50 :     heap_insert(myState->transientrel,
     503             :                 tuple,
     504             :                 myState->output_cid,
     505             :                 myState->hi_options,
     506             :                 myState->bistate);
     507             : 
     508             :     /* We know this is a newly created relation, so there are no indexes */
     509             : 
     510          50 :     return true;
     511             : }
     512             : 
     513             : /*
     514             :  * transientrel_shutdown --- executor end
     515             :  */
     516             : static void
     517          18 : transientrel_shutdown(DestReceiver *self)
     518             : {
     519          18 :     DR_transientrel *myState = (DR_transientrel *) self;
     520             : 
     521          18 :     FreeBulkInsertState(myState->bistate);
     522             : 
     523             :     /* If we skipped using WAL, must heap_sync before commit */
     524          18 :     if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
     525           0 :         heap_sync(myState->transientrel);
     526             : 
     527             :     /* close transientrel, but keep lock until commit */
     528          18 :     heap_close(myState->transientrel, NoLock);
     529          18 :     myState->transientrel = NULL;
     530          18 : }
     531             : 
     532             : /*
     533             :  * transientrel_destroy --- release DestReceiver object
     534             :  */
     535             : static void
     536           0 : transientrel_destroy(DestReceiver *self)
     537             : {
     538           0 :     pfree(self);
     539           0 : }
     540             : 
     541             : 
     542             : /*
     543             :  * Given a qualified temporary table name, append an underscore followed by
     544             :  * the given integer, to make a new table name based on the old one.
     545             :  *
     546             :  * This leaks memory through palloc(), which won't be cleaned up until the
     547             :  * current memory context is freed.
     548             :  */
     549             : static char *
     550           6 : make_temptable_name_n(char *tempname, int n)
     551             : {
     552             :     StringInfoData namebuf;
     553             : 
     554           6 :     initStringInfo(&namebuf);
     555           6 :     appendStringInfoString(&namebuf, tempname);
     556           6 :     appendStringInfo(&namebuf, "_%d", n);
     557           6 :     return namebuf.data;
     558             : }
     559             : 
     560             : static void
     561           7 : mv_GenerateOper(StringInfo buf, Oid opoid)
     562             : {
     563             :     HeapTuple   opertup;
     564             :     Form_pg_operator operform;
     565             : 
     566           7 :     opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid));
     567           7 :     if (!HeapTupleIsValid(opertup))
     568           0 :         elog(ERROR, "cache lookup failed for operator %u", opoid);
     569           7 :     operform = (Form_pg_operator) GETSTRUCT(opertup);
     570           7 :     Assert(operform->oprkind == 'b');
     571             : 
     572           7 :     appendStringInfo(buf, "OPERATOR(%s.%s)",
     573           7 :                      quote_identifier(get_namespace_name(operform->oprnamespace)),
     574           7 :                      NameStr(operform->oprname));
     575             : 
     576           7 :     ReleaseSysCache(opertup);
     577           7 : }
     578             : 
     579             : /*
     580             :  * refresh_by_match_merge
     581             :  *
     582             :  * Refresh a materialized view with transactional semantics, while allowing
     583             :  * concurrent reads.
     584             :  *
     585             :  * This is called after a new version of the data has been created in a
     586             :  * temporary table.  It performs a full outer join against the old version of
     587             :  * the data, producing "diff" results.  This join cannot work if there are any
     588             :  * duplicated rows in either the old or new versions, in the sense that every
     589             :  * column would compare as equal between the two rows.  It does work correctly
     590             :  * in the face of rows which have at least one NULL value, with all non-NULL
     591             :  * columns equal.  The behavior of NULLs on equality tests and on UNIQUE
     592             :  * indexes turns out to be quite convenient here; the tests we need to make
     593             :  * are consistent with default behavior.  If there is at least one UNIQUE
     594             :  * index on the materialized view, we have exactly the guarantee we need.
     595             :  *
     596             :  * The temporary table used to hold the diff results contains just the TID of
     597             :  * the old record (if matched) and the ROW from the new table as a single
     598             :  * column of complex record type (if matched).
     599             :  *
     600             :  * Once we have the diff table, we perform set-based DELETE and INSERT
     601             :  * operations against the materialized view, and discard both temporary
     602             :  * tables.
     603             :  *
     604             :  * Everything from the generation of the new data to applying the differences
     605             :  * takes place under cover of an ExclusiveLock, since it seems as though we
     606             :  * would want to prohibit not only concurrent REFRESH operations, but also
     607             :  * incremental maintenance.  It also doesn't seem reasonable or safe to allow
     608             :  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
     609             :  * this command.
     610             :  */
     611             : static void
     612           6 : refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
     613             :                        int save_sec_context)
     614             : {
     615             :     StringInfoData querybuf;
     616             :     Relation    matviewRel;
     617             :     Relation    tempRel;
     618             :     char       *matviewname;
     619             :     char       *tempname;
     620             :     char       *diffname;
     621             :     TupleDesc   tupdesc;
     622             :     bool        foundUniqueIndex;
     623             :     List       *indexoidlist;
     624             :     ListCell   *indexoidscan;
     625             :     int16       relnatts;
     626             :     bool       *usedForQual;
     627             : 
     628           6 :     initStringInfo(&querybuf);
     629           6 :     matviewRel = heap_open(matviewOid, NoLock);
     630           6 :     matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
     631           6 :                                              RelationGetRelationName(matviewRel));
     632           6 :     tempRel = heap_open(tempOid, NoLock);
     633           6 :     tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
     634           6 :                                           RelationGetRelationName(tempRel));
     635           6 :     diffname = make_temptable_name_n(tempname, 2);
     636             : 
     637           6 :     relnatts = matviewRel->rd_rel->relnatts;
     638           6 :     usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
     639             : 
     640             :     /* Open SPI context. */
     641           6 :     if (SPI_connect() != SPI_OK_CONNECT)
     642           0 :         elog(ERROR, "SPI_connect failed");
     643             : 
     644             :     /* Analyze the temp table with the new contents. */
     645           6 :     appendStringInfo(&querybuf, "ANALYZE %s", tempname);
     646           6 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
     647           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     648             : 
     649             :     /*
     650             :      * We need to ensure that there are not duplicate rows without NULLs in
     651             :      * the new data set before we can count on the "diff" results.  Check for
     652             :      * that in a way that allows showing the first duplicated row found.  Even
     653             :      * after we pass this test, a unique index on the materialized view may
     654             :      * find a duplicate key problem.
     655             :      */
     656           6 :     resetStringInfo(&querybuf);
     657           6 :     appendStringInfo(&querybuf,
     658             :                      "SELECT newdata FROM %s newdata "
     659             :                      "WHERE newdata IS NOT NULL AND EXISTS "
     660             :                      "(SELECT * FROM %s newdata2 WHERE newdata2 IS NOT NULL "
     661             :                      "AND newdata2 OPERATOR(pg_catalog.*=) newdata "
     662             :                      "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
     663             :                      "newdata.ctid) LIMIT 1",
     664             :                      tempname, tempname);
     665           6 :     if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
     666           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     667           6 :     if (SPI_processed > 0)
     668             :     {
     669             :         /*
     670             :          * Note that this ereport() is returning data to the user.  Generally,
     671             :          * we would want to make sure that the user has been granted access to
     672             :          * this data.  However, REFRESH MAT VIEW is only able to be run by the
     673             :          * owner of the mat view (or a superuser) and therefore there is no
     674             :          * need to check for access to data in the mat view.
     675             :          */
     676           1 :         ereport(ERROR,
     677             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
     678             :                  errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
     679             :                         RelationGetRelationName(matviewRel)),
     680             :                  errdetail("Row: %s",
     681             :                            SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
     682             :     }
     683             : 
     684           5 :     SetUserIdAndSecContext(relowner,
     685             :                            save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
     686             : 
     687             :     /* Start building the query for creating the diff table. */
     688           5 :     resetStringInfo(&querybuf);
     689           5 :     appendStringInfo(&querybuf,
     690             :                      "CREATE TEMP TABLE %s AS "
     691             :                      "SELECT mv.ctid AS tid, newdata "
     692             :                      "FROM %s mv FULL JOIN %s newdata ON (",
     693             :                      diffname, matviewname, tempname);
     694             : 
     695             :     /*
     696             :      * Get the list of index OIDs for the table from the relcache, and look up
     697             :      * each one in the pg_index syscache.  We will test for equality on all
     698             :      * columns present in all unique indexes which only reference columns and
     699             :      * include all rows.
     700             :      */
     701           5 :     tupdesc = matviewRel->rd_att;
     702           5 :     foundUniqueIndex = false;
     703           5 :     indexoidlist = RelationGetIndexList(matviewRel);
     704             : 
     705          12 :     foreach(indexoidscan, indexoidlist)
     706             :     {
     707           7 :         Oid         indexoid = lfirst_oid(indexoidscan);
     708             :         Relation    indexRel;
     709             :         Form_pg_index indexStruct;
     710             : 
     711           7 :         indexRel = index_open(indexoid, RowExclusiveLock);
     712           7 :         indexStruct = indexRel->rd_index;
     713             : 
     714             :         /*
     715             :          * We're only interested if it is unique, valid, contains no
     716             :          * expressions, and is not partial.
     717             :          */
     718          14 :         if (indexStruct->indisunique &&
     719          14 :             IndexIsValid(indexStruct) &&
     720          14 :             RelationGetIndexExpressions(indexRel) == NIL &&
     721           7 :             RelationGetIndexPredicate(indexRel) == NIL)
     722             :         {
     723           7 :             int         numatts = indexStruct->indnatts;
     724             :             int         i;
     725             : 
     726             :             /* Add quals for all columns from this index. */
     727          14 :             for (i = 0; i < numatts; i++)
     728             :             {
     729           7 :                 int         attnum = indexStruct->indkey.values[i];
     730           7 :                 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
     731             :                 Oid         type;
     732             :                 Oid         op;
     733             :                 const char *colname;
     734             : 
     735             :                 /*
     736             :                  * Only include the column once regardless of how many times
     737             :                  * it shows up in how many indexes.
     738             :                  */
     739           7 :                 if (usedForQual[attnum - 1])
     740           0 :                     continue;
     741           7 :                 usedForQual[attnum - 1] = true;
     742             : 
     743             :                 /*
     744             :                  * Actually add the qual, ANDed with any others.
     745             :                  */
     746           7 :                 if (foundUniqueIndex)
     747           2 :                     appendStringInfoString(&querybuf, " AND ");
     748             : 
     749           7 :                 colname = quote_identifier(NameStr(attr->attname));
     750           7 :                 appendStringInfo(&querybuf, "newdata.%s ", colname);
     751           7 :                 type = attnumTypeId(matviewRel, attnum);
     752           7 :                 op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr;
     753           7 :                 mv_GenerateOper(&querybuf, op);
     754           7 :                 appendStringInfo(&querybuf, " mv.%s", colname);
     755             : 
     756           7 :                 foundUniqueIndex = true;
     757             :             }
     758             :         }
     759             : 
     760             :         /* Keep the locks, since we're about to run DML which needs them. */
     761           7 :         index_close(indexRel, NoLock);
     762             :     }
     763             : 
     764           5 :     list_free(indexoidlist);
     765             : 
     766             :     /*
     767             :      * There must be at least one unique index on the matview.
     768             :      *
     769             :      * ExecRefreshMatView() checks that after taking the exclusive lock on the
     770             :      * matview. So at least one unique index is guaranteed to exist here
     771             :      * because the lock is still being held.
     772             :      */
     773           5 :     Assert(foundUniqueIndex);
     774             : 
     775           5 :     appendStringInfoString(&querybuf,
     776             :                            " AND newdata OPERATOR(pg_catalog.*=) mv) "
     777             :                            "WHERE newdata IS NULL OR mv IS NULL "
     778             :                            "ORDER BY tid");
     779             : 
     780             :     /* Create the temporary "diff" table. */
     781           5 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
     782           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     783             : 
     784           5 :     SetUserIdAndSecContext(relowner,
     785             :                            save_sec_context | SECURITY_RESTRICTED_OPERATION);
     786             : 
     787             :     /*
     788             :      * We have no further use for data from the "full-data" temp table, but we
     789             :      * must keep it around because its type is referenced from the diff table.
     790             :      */
     791             : 
     792             :     /* Analyze the diff table. */
     793           5 :     resetStringInfo(&querybuf);
     794           5 :     appendStringInfo(&querybuf, "ANALYZE %s", diffname);
     795           5 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
     796           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     797             : 
     798           5 :     OpenMatViewIncrementalMaintenance();
     799             : 
     800             :     /* Deletes must come before inserts; do them first. */
     801           5 :     resetStringInfo(&querybuf);
     802           5 :     appendStringInfo(&querybuf,
     803             :                      "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
     804             :                      "(SELECT diff.tid FROM %s diff "
     805             :                      "WHERE diff.tid IS NOT NULL "
     806             :                      "AND diff.newdata IS NULL)",
     807             :                      matviewname, diffname);
     808           5 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
     809           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     810             : 
     811             :     /* Inserts go last. */
     812           5 :     resetStringInfo(&querybuf);
     813           5 :     appendStringInfo(&querybuf,
     814             :                      "INSERT INTO %s SELECT (diff.newdata).* "
     815             :                      "FROM %s diff WHERE tid IS NULL",
     816             :                      matviewname, diffname);
     817           5 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
     818           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     819             : 
     820             :     /* We're done maintaining the materialized view. */
     821           5 :     CloseMatViewIncrementalMaintenance();
     822           5 :     heap_close(tempRel, NoLock);
     823           5 :     heap_close(matviewRel, NoLock);
     824             : 
     825             :     /* Clean up temp tables. */
     826           5 :     resetStringInfo(&querybuf);
     827           5 :     appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
     828           5 :     if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
     829           0 :         elog(ERROR, "SPI_exec failed: %s", querybuf.data);
     830             : 
     831             :     /* Close SPI context. */
     832           5 :     if (SPI_finish() != SPI_OK_FINISH)
     833           0 :         elog(ERROR, "SPI_finish failed");
     834           5 : }
     835             : 
     836             : /*
     837             :  * Swap the physical files of the target and transient tables, then rebuild
     838             :  * the target's indexes and throw away the transient table.  Security context
     839             :  * swapping is handled by the called function, so it is not needed here.
     840             :  */
     841             : static void
     842          12 : refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
     843             : {
     844          12 :     finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
     845             :                      RecentXmin, ReadNextMultiXactId(), relpersistence);
     846          11 : }
     847             : 
     848             : 
     849             : /*
     850             :  * This should be used to test whether the backend is in a context where it is
     851             :  * OK to allow DML statements to modify materialized views.  We only want to
     852             :  * allow that for internal code driven by the materialized view definition,
     853             :  * not for arbitrary user-supplied code.
     854             :  *
     855             :  * While the function names reflect the fact that their main intended use is
     856             :  * incremental maintenance of materialized views (in response to changes to
     857             :  * the data in referenced relations), they are initially used to allow REFRESH
     858             :  * without blocking concurrent reads.
     859             :  */
     860             : bool
     861          10 : MatViewIncrementalMaintenanceIsEnabled(void)
     862             : {
     863          10 :     return matview_maintenance_depth > 0;
     864             : }
     865             : 
     866             : static void
     867           5 : OpenMatViewIncrementalMaintenance(void)
     868             : {
     869           5 :     matview_maintenance_depth++;
     870           5 : }
     871             : 
     872             : static void
     873           5 : CloseMatViewIncrementalMaintenance(void)
     874             : {
     875           5 :     matview_maintenance_depth--;
     876           5 :     Assert(matview_maintenance_depth >= 0);
     877           5 : }

Generated by: LCOV version 1.11