LCOV - code coverage report
Current view: top level - src/backend/replication/logical - relation.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 0 167 0.0 %
Date: 2017-09-29 13:40:31 Functions: 0 11 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * relation.c
       3             :  *     PostgreSQL logical replication
       4             :  *
       5             :  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/relation.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains helper functions for logical replication relation
      12             :  *    mapping cache.
      13             :  *
      14             :  *-------------------------------------------------------------------------
      15             :  */
      16             : 
      17             : #include "postgres.h"
      18             : 
      19             : #include "access/heapam.h"
      20             : #include "access/sysattr.h"
      21             : #include "catalog/namespace.h"
      22             : #include "catalog/pg_subscription_rel.h"
      23             : #include "executor/executor.h"
      24             : #include "nodes/makefuncs.h"
      25             : #include "replication/logicalrelation.h"
      26             : #include "replication/worker_internal.h"
      27             : #include "utils/builtins.h"
      28             : #include "utils/inval.h"
      29             : #include "utils/lsyscache.h"
      30             : #include "utils/memutils.h"
      31             : #include "utils/syscache.h"
      32             : 
      33             : static MemoryContext LogicalRepRelMapContext = NULL;
      34             : 
      35             : static HTAB *LogicalRepRelMap = NULL;
      36             : static HTAB *LogicalRepTypMap = NULL;
      37             : 
      38             : static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
      39             :                                 uint32 hashvalue);
      40             : 
      41             : /*
      42             :  * Relcache invalidation callback for our relation map cache.
      43             :  */
      44             : static void
      45           0 : logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
      46             : {
      47             :     LogicalRepRelMapEntry *entry;
      48             : 
      49             :     /* Just to be sure. */
      50           0 :     if (LogicalRepRelMap == NULL)
      51           0 :         return;
      52             : 
      53           0 :     if (reloid != InvalidOid)
      54             :     {
      55             :         HASH_SEQ_STATUS status;
      56             : 
      57           0 :         hash_seq_init(&status, LogicalRepRelMap);
      58             : 
      59             :         /* TODO, use inverse lookup hashtable? */
      60           0 :         while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
      61             :         {
      62           0 :             if (entry->localreloid == reloid)
      63             :             {
      64           0 :                 entry->localreloid = InvalidOid;
      65           0 :                 hash_seq_term(&status);
      66           0 :                 break;
      67             :             }
      68             :         }
      69             :     }
      70             :     else
      71             :     {
      72             :         /* invalidate all cache entries */
      73             :         HASH_SEQ_STATUS status;
      74             : 
      75           0 :         hash_seq_init(&status, LogicalRepRelMap);
      76             : 
      77           0 :         while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
      78           0 :             entry->localreloid = InvalidOid;
      79             :     }
      80             : }
      81             : 
      82             : /*
      83             :  * Initialize the relation map cache.
      84             :  */
      85             : static void
      86           0 : logicalrep_relmap_init(void)
      87             : {
      88             :     HASHCTL     ctl;
      89             : 
      90           0 :     if (!LogicalRepRelMapContext)
      91           0 :         LogicalRepRelMapContext =
      92           0 :             AllocSetContextCreate(CacheMemoryContext,
      93             :                                   "LogicalRepRelMapContext",
      94             :                                   ALLOCSET_DEFAULT_SIZES);
      95             : 
      96             :     /* Initialize the relation hash table. */
      97           0 :     MemSet(&ctl, 0, sizeof(ctl));
      98           0 :     ctl.keysize = sizeof(LogicalRepRelId);
      99           0 :     ctl.entrysize = sizeof(LogicalRepRelMapEntry);
     100           0 :     ctl.hcxt = LogicalRepRelMapContext;
     101             : 
     102           0 :     LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
     103             :                                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     104             : 
     105             :     /* Initialize the type hash table. */
     106           0 :     MemSet(&ctl, 0, sizeof(ctl));
     107           0 :     ctl.keysize = sizeof(Oid);
     108           0 :     ctl.entrysize = sizeof(LogicalRepTyp);
     109           0 :     ctl.hcxt = LogicalRepRelMapContext;
     110             : 
     111             :     /* This will usually be small. */
     112           0 :     LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
     113             :                                    HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     114             : 
     115             :     /* Watch for invalidation events. */
     116           0 :     CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
     117             :                                   (Datum) 0);
     118           0 :     CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
     119             :                                   (Datum) 0);
     120           0 : }
     121             : 
     122             : /*
     123             :  * Free the entry of a relation map cache.
     124             :  */
     125             : static void
     126           0 : logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
     127             : {
     128             :     LogicalRepRelation *remoterel;
     129             : 
     130           0 :     remoterel = &entry->remoterel;
     131             : 
     132           0 :     pfree(remoterel->nspname);
     133           0 :     pfree(remoterel->relname);
     134             : 
     135           0 :     if (remoterel->natts > 0)
     136             :     {
     137             :         int         i;
     138             : 
     139           0 :         for (i = 0; i < remoterel->natts; i++)
     140           0 :             pfree(remoterel->attnames[i]);
     141             : 
     142           0 :         pfree(remoterel->attnames);
     143           0 :         pfree(remoterel->atttyps);
     144             :     }
     145           0 :     bms_free(remoterel->attkeys);
     146             : 
     147           0 :     if (entry->attrmap)
     148           0 :         pfree(entry->attrmap);
     149           0 : }
     150             : 
     151             : /*
     152             :  * Add new entry or update existing entry in the relation map cache.
     153             :  *
     154             :  * Called when new relation mapping is sent by the publisher to update
     155             :  * our expected view of incoming data from said publisher.
     156             :  */
     157             : void
     158           0 : logicalrep_relmap_update(LogicalRepRelation *remoterel)
     159             : {
     160             :     MemoryContext oldctx;
     161             :     LogicalRepRelMapEntry *entry;
     162             :     bool        found;
     163             :     int         i;
     164             : 
     165           0 :     if (LogicalRepRelMap == NULL)
     166           0 :         logicalrep_relmap_init();
     167             : 
     168             :     /*
     169             :      * HASH_ENTER returns the existing entry if present or creates a new one.
     170             :      */
     171           0 :     entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
     172             :                         HASH_ENTER, &found);
     173             : 
     174           0 :     if (found)
     175           0 :         logicalrep_relmap_free_entry(entry);
     176             : 
     177           0 :     memset(entry, 0, sizeof(LogicalRepRelMapEntry));
     178             : 
     179             :     /* Make cached copy of the data */
     180           0 :     oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     181           0 :     entry->remoterel.remoteid = remoterel->remoteid;
     182           0 :     entry->remoterel.nspname = pstrdup(remoterel->nspname);
     183           0 :     entry->remoterel.relname = pstrdup(remoterel->relname);
     184           0 :     entry->remoterel.natts = remoterel->natts;
     185           0 :     entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
     186           0 :     entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
     187           0 :     for (i = 0; i < remoterel->natts; i++)
     188             :     {
     189           0 :         entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
     190           0 :         entry->remoterel.atttyps[i] = remoterel->atttyps[i];
     191             :     }
     192           0 :     entry->remoterel.replident = remoterel->replident;
     193           0 :     entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
     194           0 :     MemoryContextSwitchTo(oldctx);
     195           0 : }
     196             : 
     197             : /*
     198             :  * Find attribute index in TupleDesc struct by attribute name.
     199             :  *
     200             :  * Returns -1 if not found.
     201             :  */
     202             : static int
     203           0 : logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
     204             : {
     205             :     int         i;
     206             : 
     207           0 :     for (i = 0; i < remoterel->natts; i++)
     208             :     {
     209           0 :         if (strcmp(remoterel->attnames[i], attname) == 0)
     210           0 :             return i;
     211             :     }
     212             : 
     213           0 :     return -1;
     214             : }
     215             : 
     216             : /*
     217             :  * Open the local relation associated with the remote one.
     218             :  *
     219             :  * Optionally rebuilds the Relcache mapping if it was invalidated
     220             :  * by local DDL.
     221             :  */
     222             : LogicalRepRelMapEntry *
     223           0 : logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
     224             : {
     225             :     LogicalRepRelMapEntry *entry;
     226             :     bool        found;
     227             : 
     228           0 :     if (LogicalRepRelMap == NULL)
     229           0 :         logicalrep_relmap_init();
     230             : 
     231             :     /* Search for existing entry. */
     232           0 :     entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
     233             :                         HASH_FIND, &found);
     234             : 
     235           0 :     if (!found)
     236           0 :         elog(ERROR, "no relation map entry for remote relation ID %u",
     237             :              remoteid);
     238             : 
     239             :     /* Need to update the local cache? */
     240           0 :     if (!OidIsValid(entry->localreloid))
     241             :     {
     242             :         Oid         relid;
     243             :         int         i;
     244             :         int         found;
     245             :         Bitmapset  *idkey;
     246             :         TupleDesc   desc;
     247             :         LogicalRepRelation *remoterel;
     248             :         MemoryContext oldctx;
     249             : 
     250           0 :         remoterel = &entry->remoterel;
     251             : 
     252             :         /* Try to find and lock the relation by name. */
     253           0 :         relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
     254             :                                               remoterel->relname, -1),
     255             :                                  lockmode, true);
     256           0 :         if (!OidIsValid(relid))
     257           0 :             ereport(ERROR,
     258             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     259             :                      errmsg("logical replication target relation \"%s.%s\" does not exist",
     260             :                             remoterel->nspname, remoterel->relname)));
     261           0 :         entry->localrel = heap_open(relid, NoLock);
     262             : 
     263             :         /* Check for supported relkind. */
     264           0 :         CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
     265           0 :                                  remoterel->nspname, remoterel->relname);
     266             : 
     267             :         /*
     268             :          * Build the mapping of local attribute numbers to remote attribute
     269             :          * numbers and validate that we don't miss any replicated columns as
     270             :          * that would result in potentially unwanted data loss.
     271             :          */
     272           0 :         desc = RelationGetDescr(entry->localrel);
     273           0 :         oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     274           0 :         entry->attrmap = palloc(desc->natts * sizeof(int));
     275           0 :         MemoryContextSwitchTo(oldctx);
     276             : 
     277           0 :         found = 0;
     278           0 :         for (i = 0; i < desc->natts; i++)
     279             :         {
     280             :             int         attnum;
     281           0 :             Form_pg_attribute attr = TupleDescAttr(desc, i);
     282             : 
     283           0 :             if (attr->attisdropped)
     284             :             {
     285           0 :                 entry->attrmap[i] = -1;
     286           0 :                 continue;
     287             :             }
     288             : 
     289           0 :             attnum = logicalrep_rel_att_by_name(remoterel,
     290           0 :                                                 NameStr(attr->attname));
     291             : 
     292           0 :             entry->attrmap[i] = attnum;
     293           0 :             if (attnum >= 0)
     294           0 :                 found++;
     295             :         }
     296             : 
     297             :         /* TODO, detail message with names of missing columns */
     298           0 :         if (found < remoterel->natts)
     299           0 :             ereport(ERROR,
     300             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     301             :                      errmsg("logical replication target relation \"%s.%s\" is missing "
     302             :                             "some replicated columns",
     303             :                             remoterel->nspname, remoterel->relname)));
     304             : 
     305             :         /*
     306             :          * Check that replica identity matches. We allow for stricter replica
     307             :          * identity (fewer columns) on subscriber as that will not stop us
     308             :          * from finding unique tuple. IE, if publisher has identity
     309             :          * (id,timestamp) and subscriber just (id) this will not be a problem,
     310             :          * but in the opposite scenario it will.
     311             :          *
     312             :          * Don't throw any error here just mark the relation entry as not
     313             :          * updatable, as replica identity is only for updates and deletes but
     314             :          * inserts can be replicated even without it.
     315             :          */
     316           0 :         entry->updatable = true;
     317           0 :         idkey = RelationGetIndexAttrBitmap(entry->localrel,
     318             :                                            INDEX_ATTR_BITMAP_IDENTITY_KEY);
     319             :         /* fallback to PK if no replica identity */
     320           0 :         if (idkey == NULL)
     321             :         {
     322           0 :             idkey = RelationGetIndexAttrBitmap(entry->localrel,
     323             :                                                INDEX_ATTR_BITMAP_PRIMARY_KEY);
     324             : 
     325             :             /*
     326             :              * If no replica identity index and no PK, the published table
     327             :              * must have replica identity FULL.
     328             :              */
     329           0 :             if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
     330           0 :                 entry->updatable = false;
     331             :         }
     332             : 
     333           0 :         i = -1;
     334           0 :         while ((i = bms_next_member(idkey, i)) >= 0)
     335             :         {
     336           0 :             int         attnum = i + FirstLowInvalidHeapAttributeNumber;
     337             : 
     338           0 :             if (!AttrNumberIsForUserDefinedAttr(attnum))
     339           0 :                 ereport(ERROR,
     340             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     341             :                          errmsg("logical replication target relation \"%s.%s\" uses "
     342             :                                 "system columns in REPLICA IDENTITY index",
     343             :                                 remoterel->nspname, remoterel->relname)));
     344             : 
     345           0 :             attnum = AttrNumberGetAttrOffset(attnum);
     346             : 
     347           0 :             if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
     348             :             {
     349           0 :                 entry->updatable = false;
     350           0 :                 break;
     351             :             }
     352             :         }
     353             : 
     354           0 :         entry->localreloid = relid;
     355             :     }
     356             :     else
     357           0 :         entry->localrel = heap_open(entry->localreloid, lockmode);
     358             : 
     359           0 :     if (entry->state != SUBREL_STATE_READY)
     360           0 :         entry->state = GetSubscriptionRelState(MySubscription->oid,
     361             :                                                entry->localreloid,
     362             :                                                &entry->statelsn,
     363             :                                                true);
     364             : 
     365           0 :     return entry;
     366             : }
     367             : 
     368             : /*
     369             :  * Close the previously opened logical relation.
     370             :  */
     371             : void
     372           0 : logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
     373             : {
     374           0 :     heap_close(rel->localrel, lockmode);
     375           0 :     rel->localrel = NULL;
     376           0 : }
     377             : 
     378             : 
     379             : /*
     380             :  * Type cache invalidation callback for our type map cache.
     381             :  */
     382             : static void
     383           0 : logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
     384             : {
     385             :     HASH_SEQ_STATUS status;
     386             :     LogicalRepTyp *entry;
     387             : 
     388             :     /* Just to be sure. */
     389           0 :     if (LogicalRepTypMap == NULL)
     390           0 :         return;
     391             : 
     392             :     /* invalidate all cache entries */
     393           0 :     hash_seq_init(&status, LogicalRepTypMap);
     394             : 
     395           0 :     while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
     396           0 :         entry->typoid = InvalidOid;
     397             : }
     398             : 
     399             : /*
     400             :  * Free the type map cache entry data.
     401             :  */
     402             : static void
     403           0 : logicalrep_typmap_free_entry(LogicalRepTyp *entry)
     404             : {
     405           0 :     pfree(entry->nspname);
     406           0 :     pfree(entry->typname);
     407             : 
     408           0 :     entry->typoid = InvalidOid;
     409           0 : }
     410             : 
     411             : /*
     412             :  * Add new entry or update existing entry in the type map cache.
     413             :  */
     414             : void
     415           0 : logicalrep_typmap_update(LogicalRepTyp *remotetyp)
     416             : {
     417             :     MemoryContext oldctx;
     418             :     LogicalRepTyp *entry;
     419             :     bool        found;
     420             : 
     421           0 :     if (LogicalRepTypMap == NULL)
     422           0 :         logicalrep_relmap_init();
     423             : 
     424             :     /*
     425             :      * HASH_ENTER returns the existing entry if present or creates a new one.
     426             :      */
     427           0 :     entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
     428             :                         HASH_ENTER, &found);
     429             : 
     430           0 :     if (found)
     431           0 :         logicalrep_typmap_free_entry(entry);
     432             : 
     433             :     /* Make cached copy of the data */
     434           0 :     entry->remoteid = remotetyp->remoteid;
     435           0 :     oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
     436           0 :     entry->nspname = pstrdup(remotetyp->nspname);
     437           0 :     entry->typname = pstrdup(remotetyp->typname);
     438           0 :     MemoryContextSwitchTo(oldctx);
     439           0 :     entry->typoid = InvalidOid;
     440           0 : }
     441             : 
     442             : /*
     443             :  * Fetch type info from the cache.
     444             :  */
     445             : Oid
     446           0 : logicalrep_typmap_getid(Oid remoteid)
     447             : {
     448             :     LogicalRepTyp *entry;
     449             :     bool        found;
     450             :     Oid         nspoid;
     451             : 
     452             :     /* Internal types are mapped directly. */
     453           0 :     if (remoteid < FirstNormalObjectId)
     454             :     {
     455           0 :         if (!get_typisdefined(remoteid))
     456           0 :             ereport(ERROR,
     457             :                     (errmsg("builtin type %u not found", remoteid),
     458             :                      errhint("This can be caused by having publisher with "
     459             :                              "higher major version than subscriber")));
     460           0 :         return remoteid;
     461             :     }
     462             : 
     463           0 :     if (LogicalRepTypMap == NULL)
     464           0 :         logicalrep_relmap_init();
     465             : 
     466             :     /* Try finding the mapping. */
     467           0 :     entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
     468             :                         HASH_FIND, &found);
     469             : 
     470           0 :     if (!found)
     471           0 :         elog(ERROR, "no type map entry for remote type %u",
     472             :              remoteid);
     473             : 
     474             :     /* Found and mapped, return the oid. */
     475           0 :     if (OidIsValid(entry->typoid))
     476           0 :         return entry->typoid;
     477             : 
     478             :     /* Otherwise, try to map to local type. */
     479           0 :     nspoid = LookupExplicitNamespace(entry->nspname, true);
     480           0 :     if (OidIsValid(nspoid))
     481           0 :         entry->typoid = GetSysCacheOid2(TYPENAMENSP,
     482             :                                         PointerGetDatum(entry->typname),
     483             :                                         ObjectIdGetDatum(nspoid));
     484             :     else
     485           0 :         entry->typoid = InvalidOid;
     486             : 
     487           0 :     if (!OidIsValid(entry->typoid))
     488           0 :         ereport(ERROR,
     489             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     490             :                  errmsg("data type \"%s.%s\" required for logical replication does not exist",
     491             :                         entry->nspname, entry->typname)));
     492             : 
     493           0 :     return entry->typoid;
     494             : }

Generated by: LCOV version 1.11