LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 99 144 68.8 %
Date: 2017-09-29 13:40:31 Functions: 11 13 84.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "funcapi.h"
      18             : #include "miscadmin.h"
      19             : 
      20             : #include "access/genam.h"
      21             : #include "access/hash.h"
      22             : #include "access/heapam.h"
      23             : #include "access/htup_details.h"
      24             : #include "access/xact.h"
      25             : 
      26             : #include "catalog/catalog.h"
      27             : #include "catalog/dependency.h"
      28             : #include "catalog/index.h"
      29             : #include "catalog/indexing.h"
      30             : #include "catalog/namespace.h"
      31             : #include "catalog/objectaccess.h"
      32             : #include "catalog/objectaddress.h"
      33             : #include "catalog/pg_type.h"
      34             : #include "catalog/pg_publication.h"
      35             : #include "catalog/pg_publication_rel.h"
      36             : 
      37             : #include "utils/array.h"
      38             : #include "utils/builtins.h"
      39             : #include "utils/catcache.h"
      40             : #include "utils/fmgroids.h"
      41             : #include "utils/inval.h"
      42             : #include "utils/lsyscache.h"
      43             : #include "utils/rel.h"
      44             : #include "utils/syscache.h"
      45             : 
      46             : /*
      47             :  * Check if relation can be in given publication and throws appropriate
      48             :  * error if not.
      49             :  */
      50             : static void
      51          14 : check_publication_add_relation(Relation targetrel)
      52             : {
      53             :     /* Give more specific error for partitioned tables */
      54          14 :     if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
      55           1 :         ereport(ERROR,
      56             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      57             :                  errmsg("\"%s\" is a partitioned table",
      58             :                         RelationGetRelationName(targetrel)),
      59             :                  errdetail("Adding partitioned tables to publications is not supported."),
      60             :                  errhint("You can add the table partitions individually.")));
      61             : 
      62             :     /* Must be table */
      63          13 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
      64           2 :         ereport(ERROR,
      65             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      66             :                  errmsg("\"%s\" is not a table",
      67             :                         RelationGetRelationName(targetrel)),
      68             :                  errdetail("Only tables can be added to publications.")));
      69             : 
      70             :     /* Can't be system table */
      71          11 :     if (IsCatalogRelation(targetrel))
      72           0 :         ereport(ERROR,
      73             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      74             :                  errmsg("\"%s\" is a system table",
      75             :                         RelationGetRelationName(targetrel)),
      76             :                  errdetail("System tables cannot be added to publications.")));
      77             : 
      78             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      79          11 :     if (!RelationNeedsWAL(targetrel))
      80           0 :         ereport(ERROR,
      81             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      82             :                  errmsg("table \"%s\" cannot be replicated",
      83             :                         RelationGetRelationName(targetrel)),
      84             :                  errdetail("Temporary and unlogged relations cannot be replicated.")));
      85          11 : }
      86             : 
      87             : /*
      88             :  * Returns if relation represented by oid and Form_pg_class entry
      89             :  * is publishable.
      90             :  *
      91             :  * Does same checks as the above, but does not need relation to be opened
      92             :  * and also does not throw errors.
      93             :  *
      94             :  * Note this also excludes all tables with relid < FirstNormalObjectId,
      95             :  * ie all tables created during initdb.  This mainly affects the preinstalled
      96             :  * information_schema.  (IsCatalogClass() only checks for these inside
      97             :  * pg_catalog and toast schemas.)
      98             :  */
      99             : static bool
     100         169 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     101             : {
     102         460 :     return reltuple->relkind == RELKIND_RELATION &&
     103         244 :         !IsCatalogClass(relid, reltuple) &&
     104         413 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     105             :         relid >= FirstNormalObjectId;
     106             : }
     107             : 
     108             : 
     109             : /*
     110             :  * SQL-callable variant of the above
     111             :  *
     112             :  * This returns null when the relation does not exist.  This is intended to be
     113             :  * used for example in psql to avoid gratuitous errors when there are
     114             :  * concurrent catalog changes.
     115             :  */
     116             : Datum
     117         169 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     118             : {
     119         169 :     Oid         relid = PG_GETARG_OID(0);
     120             :     HeapTuple   tuple;
     121             :     bool        result;
     122             : 
     123         169 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     124         169 :     if (!tuple)
     125           0 :         PG_RETURN_NULL();
     126         169 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     127         169 :     ReleaseSysCache(tuple);
     128         169 :     PG_RETURN_BOOL(result);
     129             : }
     130             : 
     131             : 
     132             : /*
     133             :  * Insert new publication / relation mapping.
     134             :  */
     135             : ObjectAddress
     136          16 : publication_add_relation(Oid pubid, Relation targetrel,
     137             :                          bool if_not_exists)
     138             : {
     139             :     Relation    rel;
     140             :     HeapTuple   tup;
     141             :     Datum       values[Natts_pg_publication_rel];
     142             :     bool        nulls[Natts_pg_publication_rel];
     143          16 :     Oid         relid = RelationGetRelid(targetrel);
     144             :     Oid         prrelid;
     145          16 :     Publication *pub = GetPublication(pubid);
     146             :     ObjectAddress myself,
     147             :                 referenced;
     148             : 
     149          16 :     rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
     150             : 
     151             :     /*
     152             :      * Check for duplicates. Note that this does not really prevent
     153             :      * duplicates, it's here just to provide nicer error message in common
     154             :      * case. The real protection is the unique key on the catalog.
     155             :      */
     156          16 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     157             :                               ObjectIdGetDatum(pubid)))
     158             :     {
     159           2 :         heap_close(rel, RowExclusiveLock);
     160             : 
     161           2 :         if (if_not_exists)
     162           1 :             return InvalidObjectAddress;
     163             : 
     164           1 :         ereport(ERROR,
     165             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     166             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     167             :                         RelationGetRelationName(targetrel), pub->name)));
     168             :     }
     169             : 
     170          14 :     check_publication_add_relation(targetrel);
     171             : 
     172             :     /* Form a tuple. */
     173          11 :     memset(values, 0, sizeof(values));
     174          11 :     memset(nulls, false, sizeof(nulls));
     175             : 
     176          11 :     values[Anum_pg_publication_rel_prpubid - 1] =
     177             :         ObjectIdGetDatum(pubid);
     178          11 :     values[Anum_pg_publication_rel_prrelid - 1] =
     179             :         ObjectIdGetDatum(relid);
     180             : 
     181          11 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     182             : 
     183             :     /* Insert tuple into catalog. */
     184          11 :     prrelid = CatalogTupleInsert(rel, tup);
     185          11 :     heap_freetuple(tup);
     186             : 
     187          11 :     ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
     188             : 
     189             :     /* Add dependency on the publication */
     190          11 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     191          11 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     192             : 
     193             :     /* Add dependency on the relation */
     194          11 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     195          11 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     196             : 
     197             :     /* Close the table. */
     198          11 :     heap_close(rel, RowExclusiveLock);
     199             : 
     200             :     /* Invalidate relcache so that publication info is rebuilt. */
     201          11 :     CacheInvalidateRelcache(targetrel);
     202             : 
     203          11 :     return myself;
     204             : }
     205             : 
     206             : 
     207             : /*
     208             :  * Gets list of publication oids for a relation oid.
     209             :  */
     210             : List *
     211         262 : GetRelationPublications(Oid relid)
     212             : {
     213         262 :     List       *result = NIL;
     214             :     CatCList   *pubrellist;
     215             :     int         i;
     216             : 
     217             :     /* Find all publications associated with the relation. */
     218         262 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     219             :                                      ObjectIdGetDatum(relid));
     220         262 :     for (i = 0; i < pubrellist->n_members; i++)
     221             :     {
     222           0 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     223           0 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     224             : 
     225           0 :         result = lappend_oid(result, pubid);
     226             :     }
     227             : 
     228         262 :     ReleaseSysCacheList(pubrellist);
     229             : 
     230         262 :     return result;
     231             : }
     232             : 
     233             : /*
     234             :  * Gets list of relation oids for a publication.
     235             :  *
     236             :  * This should only be used for normal publications, the FOR ALL TABLES
     237             :  * should use GetAllTablesPublicationRelations().
     238             :  */
     239             : List *
     240           3 : GetPublicationRelations(Oid pubid)
     241             : {
     242             :     List       *result;
     243             :     Relation    pubrelsrel;
     244             :     ScanKeyData scankey;
     245             :     SysScanDesc scan;
     246             :     HeapTuple   tup;
     247             : 
     248             :     /* Find all publications associated with the relation. */
     249           3 :     pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
     250             : 
     251           3 :     ScanKeyInit(&scankey,
     252             :                 Anum_pg_publication_rel_prpubid,
     253             :                 BTEqualStrategyNumber, F_OIDEQ,
     254             :                 ObjectIdGetDatum(pubid));
     255             : 
     256           3 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
     257             :                               true, NULL, 1, &scankey);
     258             : 
     259           3 :     result = NIL;
     260           7 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     261             :     {
     262             :         Form_pg_publication_rel pubrel;
     263             : 
     264           1 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     265             : 
     266           1 :         result = lappend_oid(result, pubrel->prrelid);
     267             :     }
     268             : 
     269           3 :     systable_endscan(scan);
     270           3 :     heap_close(pubrelsrel, AccessShareLock);
     271             : 
     272           3 :     return result;
     273             : }
     274             : 
     275             : /*
     276             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     277             :  */
     278             : List *
     279         258 : GetAllTablesPublications(void)
     280             : {
     281             :     List       *result;
     282             :     Relation    rel;
     283             :     ScanKeyData scankey;
     284             :     SysScanDesc scan;
     285             :     HeapTuple   tup;
     286             : 
     287             :     /* Find all publications that are marked as for all tables. */
     288         258 :     rel = heap_open(PublicationRelationId, AccessShareLock);
     289             : 
     290         258 :     ScanKeyInit(&scankey,
     291             :                 Anum_pg_publication_puballtables,
     292             :                 BTEqualStrategyNumber, F_BOOLEQ,
     293             :                 BoolGetDatum(true));
     294             : 
     295         258 :     scan = systable_beginscan(rel, InvalidOid, false,
     296             :                               NULL, 1, &scankey);
     297             : 
     298         258 :     result = NIL;
     299         516 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     300           0 :         result = lappend_oid(result, HeapTupleGetOid(tup));
     301             : 
     302         258 :     systable_endscan(scan);
     303         258 :     heap_close(rel, AccessShareLock);
     304             : 
     305         258 :     return result;
     306             : }
     307             : 
     308             : /*
     309             :  * Gets list of all relation published by FOR ALL TABLES publication(s).
     310             :  */
     311             : List *
     312           0 : GetAllTablesPublicationRelations(void)
     313             : {
     314             :     Relation    classRel;
     315             :     ScanKeyData key[1];
     316             :     HeapScanDesc scan;
     317             :     HeapTuple   tuple;
     318           0 :     List       *result = NIL;
     319             : 
     320           0 :     classRel = heap_open(RelationRelationId, AccessShareLock);
     321             : 
     322           0 :     ScanKeyInit(&key[0],
     323             :                 Anum_pg_class_relkind,
     324             :                 BTEqualStrategyNumber, F_CHAREQ,
     325             :                 CharGetDatum(RELKIND_RELATION));
     326             : 
     327           0 :     scan = heap_beginscan_catalog(classRel, 1, key);
     328             : 
     329           0 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     330             :     {
     331           0 :         Oid         relid = HeapTupleGetOid(tuple);
     332           0 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     333             : 
     334           0 :         if (is_publishable_class(relid, relForm))
     335           0 :             result = lappend_oid(result, relid);
     336             :     }
     337             : 
     338           0 :     heap_endscan(scan);
     339           0 :     heap_close(classRel, AccessShareLock);
     340             : 
     341           0 :     return result;
     342             : }
     343             : 
     344             : /*
     345             :  * Get publication using oid
     346             :  *
     347             :  * The Publication struct and its data are palloc'ed here.
     348             :  */
     349             : Publication *
     350          18 : GetPublication(Oid pubid)
     351             : {
     352             :     HeapTuple   tup;
     353             :     Publication *pub;
     354             :     Form_pg_publication pubform;
     355             : 
     356          18 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     357             : 
     358          18 :     if (!HeapTupleIsValid(tup))
     359           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
     360             : 
     361          18 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     362             : 
     363          18 :     pub = (Publication *) palloc(sizeof(Publication));
     364          18 :     pub->oid = pubid;
     365          18 :     pub->name = pstrdup(NameStr(pubform->pubname));
     366          18 :     pub->alltables = pubform->puballtables;
     367          18 :     pub->pubactions.pubinsert = pubform->pubinsert;
     368          18 :     pub->pubactions.pubupdate = pubform->pubupdate;
     369          18 :     pub->pubactions.pubdelete = pubform->pubdelete;
     370             : 
     371          18 :     ReleaseSysCache(tup);
     372             : 
     373          18 :     return pub;
     374             : }
     375             : 
     376             : 
     377             : /*
     378             :  * Get Publication using name.
     379             :  */
     380             : Publication *
     381           2 : GetPublicationByName(const char *pubname, bool missing_ok)
     382             : {
     383             :     Oid         oid;
     384             : 
     385           2 :     oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
     386           2 :     if (!OidIsValid(oid))
     387             :     {
     388           0 :         if (missing_ok)
     389           0 :             return NULL;
     390             : 
     391           0 :         ereport(ERROR,
     392             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     393             :                  errmsg("publication \"%s\" does not exist", pubname)));
     394             :     }
     395             : 
     396           2 :     return GetPublication(oid);
     397             : }
     398             : 
     399             : /*
     400             :  * get_publication_oid - given a publication name, look up the OID
     401             :  *
     402             :  * If missing_ok is false, throw an error if name not found.  If true, just
     403             :  * return InvalidOid.
     404             :  */
     405             : Oid
     406          16 : get_publication_oid(const char *pubname, bool missing_ok)
     407             : {
     408             :     Oid         oid;
     409             : 
     410          16 :     oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
     411          16 :     if (!OidIsValid(oid) && !missing_ok)
     412           1 :         ereport(ERROR,
     413             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     414             :                  errmsg("publication \"%s\" does not exist", pubname)));
     415          15 :     return oid;
     416             : }
     417             : 
     418             : /*
     419             :  * get_publication_name - given a publication Oid, look up the name
     420             :  */
     421             : char *
     422          23 : get_publication_name(Oid pubid)
     423             : {
     424             :     HeapTuple   tup;
     425             :     char       *pubname;
     426             :     Form_pg_publication pubform;
     427             : 
     428          23 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     429             : 
     430          23 :     if (!HeapTupleIsValid(tup))
     431           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
     432             : 
     433          23 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     434          23 :     pubname = pstrdup(NameStr(pubform->pubname));
     435             : 
     436          23 :     ReleaseSysCache(tup);
     437             : 
     438          23 :     return pubname;
     439             : }
     440             : 
     441             : /*
     442             :  * Returns Oids of tables in a publication.
     443             :  */
     444             : Datum
     445           0 : pg_get_publication_tables(PG_FUNCTION_ARGS)
     446             : {
     447             :     FuncCallContext *funcctx;
     448           0 :     char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     449             :     Publication *publication;
     450             :     List       *tables;
     451             :     ListCell  **lcp;
     452             : 
     453             :     /* stuff done only on the first call of the function */
     454           0 :     if (SRF_IS_FIRSTCALL())
     455             :     {
     456             :         MemoryContext oldcontext;
     457             : 
     458             :         /* create a function context for cross-call persistence */
     459           0 :         funcctx = SRF_FIRSTCALL_INIT();
     460             : 
     461             :         /* switch to memory context appropriate for multiple function calls */
     462           0 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
     463             : 
     464           0 :         publication = GetPublicationByName(pubname, false);
     465           0 :         if (publication->alltables)
     466           0 :             tables = GetAllTablesPublicationRelations();
     467             :         else
     468           0 :             tables = GetPublicationRelations(publication->oid);
     469           0 :         lcp = (ListCell **) palloc(sizeof(ListCell *));
     470           0 :         *lcp = list_head(tables);
     471           0 :         funcctx->user_fctx = (void *) lcp;
     472             : 
     473           0 :         MemoryContextSwitchTo(oldcontext);
     474             :     }
     475             : 
     476             :     /* stuff done on every call of the function */
     477           0 :     funcctx = SRF_PERCALL_SETUP();
     478           0 :     lcp = (ListCell **) funcctx->user_fctx;
     479             : 
     480           0 :     while (*lcp != NULL)
     481             :     {
     482           0 :         Oid         relid = lfirst_oid(*lcp);
     483             : 
     484           0 :         *lcp = lnext(*lcp);
     485           0 :         SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
     486             :     }
     487             : 
     488           0 :     SRF_RETURN_DONE(funcctx);
     489             : }

Generated by: LCOV version 1.11