LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 217 255 85.1 %
Date: 2017-09-29 15:12:54 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * publicationcmds.c
       4             :  *      publication manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      publicationcmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "funcapi.h"
      18             : #include "miscadmin.h"
      19             : 
      20             : #include "access/genam.h"
      21             : #include "access/hash.h"
      22             : #include "access/heapam.h"
      23             : #include "access/htup_details.h"
      24             : #include "access/xact.h"
      25             : 
      26             : #include "catalog/catalog.h"
      27             : #include "catalog/indexing.h"
      28             : #include "catalog/namespace.h"
      29             : #include "catalog/objectaccess.h"
      30             : #include "catalog/objectaddress.h"
      31             : #include "catalog/pg_inherits_fn.h"
      32             : #include "catalog/pg_type.h"
      33             : #include "catalog/pg_publication.h"
      34             : #include "catalog/pg_publication_rel.h"
      35             : 
      36             : #include "commands/dbcommands.h"
      37             : #include "commands/defrem.h"
      38             : #include "commands/event_trigger.h"
      39             : #include "commands/publicationcmds.h"
      40             : 
      41             : #include "utils/array.h"
      42             : #include "utils/builtins.h"
      43             : #include "utils/catcache.h"
      44             : #include "utils/fmgroids.h"
      45             : #include "utils/inval.h"
      46             : #include "utils/lsyscache.h"
      47             : #include "utils/rel.h"
      48             : #include "utils/syscache.h"
      49             : #include "utils/varlena.h"
      50             : 
      51             : /* Same as MAXNUMMESSAGES in sinvaladt.c */
      52             : #define MAX_RELCACHE_INVAL_MSGS 4096
      53             : 
      54             : static List *OpenTableList(List *tables);
      55             : static void CloseTableList(List *rels);
      56             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      57             :                      AlterPublicationStmt *stmt);
      58             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      59             : 
      60             : static void
      61          14 : parse_publication_options(List *options,
      62             :                           bool *publish_given,
      63             :                           bool *publish_insert,
      64             :                           bool *publish_update,
      65             :                           bool *publish_delete)
      66             : {
      67             :     ListCell   *lc;
      68             : 
      69          14 :     *publish_given = false;
      70             : 
      71             :     /* Defaults are true */
      72          14 :     *publish_insert = true;
      73          14 :     *publish_update = true;
      74          14 :     *publish_delete = true;
      75             : 
      76             :     /* Parse options */
      77          19 :     foreach(lc, options)
      78             :     {
      79           7 :         DefElem    *defel = (DefElem *) lfirst(lc);
      80             : 
      81           7 :         if (strcmp(defel->defname, "publish") == 0)
      82             :         {
      83             :             char       *publish;
      84             :             List       *publish_list;
      85             :             ListCell   *lc;
      86             : 
      87           6 :             if (*publish_given)
      88           0 :                 ereport(ERROR,
      89             :                         (errcode(ERRCODE_SYNTAX_ERROR),
      90             :                          errmsg("conflicting or redundant options")));
      91             : 
      92             :             /*
      93             :              * If publish option was given only the explicitly listed actions
      94             :              * should be published.
      95             :              */
      96           6 :             *publish_insert = false;
      97           6 :             *publish_update = false;
      98           6 :             *publish_delete = false;
      99             : 
     100           6 :             *publish_given = true;
     101           6 :             publish = defGetString(defel);
     102             : 
     103           6 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     104           0 :                 ereport(ERROR,
     105             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     106             :                          errmsg("invalid publish list")));
     107             : 
     108             :             /* Process the option list. */
     109          14 :             foreach(lc, publish_list)
     110             :             {
     111           9 :                 char       *publish_opt = (char *) lfirst(lc);
     112             : 
     113           9 :                 if (strcmp(publish_opt, "insert") == 0)
     114           4 :                     *publish_insert = true;
     115           5 :                 else if (strcmp(publish_opt, "update") == 0)
     116           3 :                     *publish_update = true;
     117           2 :                 else if (strcmp(publish_opt, "delete") == 0)
     118           1 :                     *publish_delete = true;
     119             :                 else
     120           1 :                     ereport(ERROR,
     121             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     122             :                              errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
     123             :             }
     124             :         }
     125             :         else
     126           1 :             ereport(ERROR,
     127             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     128             :                      errmsg("unrecognized publication parameter: %s", defel->defname)));
     129             :     }
     130          12 : }
     131             : 
     132             : /*
     133             :  * Create new publication.
     134             :  */
     135             : ObjectAddress
     136          13 : CreatePublication(CreatePublicationStmt *stmt)
     137             : {
     138             :     Relation    rel;
     139             :     ObjectAddress myself;
     140             :     Oid         puboid;
     141             :     bool        nulls[Natts_pg_publication];
     142             :     Datum       values[Natts_pg_publication];
     143             :     HeapTuple   tup;
     144             :     bool        publish_given;
     145             :     bool        publish_insert;
     146             :     bool        publish_update;
     147             :     bool        publish_delete;
     148             :     AclResult   aclresult;
     149             : 
     150             :     /* must have CREATE privilege on database */
     151          13 :     aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
     152          13 :     if (aclresult != ACLCHECK_OK)
     153           2 :         aclcheck_error(aclresult, ACL_KIND_DATABASE,
     154           1 :                        get_database_name(MyDatabaseId));
     155             : 
     156             :     /* FOR ALL TABLES requires superuser */
     157          12 :     if (stmt->for_all_tables && !superuser())
     158           0 :         ereport(ERROR,
     159             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     160             :                  (errmsg("must be superuser to create FOR ALL TABLES publication"))));
     161             : 
     162          12 :     rel = heap_open(PublicationRelationId, RowExclusiveLock);
     163             : 
     164             :     /* Check if name is used */
     165          12 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname));
     166          12 :     if (OidIsValid(puboid))
     167             :     {
     168           1 :         ereport(ERROR,
     169             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     170             :                  errmsg("publication \"%s\" already exists",
     171             :                         stmt->pubname)));
     172             :     }
     173             : 
     174             :     /* Form a tuple. */
     175          11 :     memset(values, 0, sizeof(values));
     176          11 :     memset(nulls, false, sizeof(nulls));
     177             : 
     178          11 :     values[Anum_pg_publication_pubname - 1] =
     179          11 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     180          11 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     181             : 
     182          11 :     parse_publication_options(stmt->options,
     183             :                               &publish_given, &publish_insert,
     184             :                               &publish_update, &publish_delete);
     185             : 
     186           9 :     values[Anum_pg_publication_puballtables - 1] =
     187           9 :         BoolGetDatum(stmt->for_all_tables);
     188           9 :     values[Anum_pg_publication_pubinsert - 1] =
     189           9 :         BoolGetDatum(publish_insert);
     190           9 :     values[Anum_pg_publication_pubupdate - 1] =
     191           9 :         BoolGetDatum(publish_update);
     192           9 :     values[Anum_pg_publication_pubdelete - 1] =
     193           9 :         BoolGetDatum(publish_delete);
     194             : 
     195           9 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     196             : 
     197             :     /* Insert tuple into catalog. */
     198           9 :     puboid = CatalogTupleInsert(rel, tup);
     199           9 :     heap_freetuple(tup);
     200             : 
     201           9 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     202             : 
     203           9 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     204             : 
     205             :     /* Make the changes visible. */
     206           9 :     CommandCounterIncrement();
     207             : 
     208           9 :     if (stmt->tables)
     209             :     {
     210             :         List       *rels;
     211             : 
     212           5 :         Assert(list_length(stmt->tables) > 0);
     213             : 
     214           5 :         rels = OpenTableList(stmt->tables);
     215           5 :         PublicationAddTables(puboid, rels, true, NULL);
     216           4 :         CloseTableList(rels);
     217             :     }
     218             : 
     219           8 :     heap_close(rel, RowExclusiveLock);
     220             : 
     221           8 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     222             : 
     223           8 :     return myself;
     224             : }
     225             : 
     226             : /*
     227             :  * Change options of a publication.
     228             :  */
     229             : static void
     230           3 : AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
     231             :                         HeapTuple tup)
     232             : {
     233             :     bool        nulls[Natts_pg_publication];
     234             :     bool        replaces[Natts_pg_publication];
     235             :     Datum       values[Natts_pg_publication];
     236             :     bool        publish_given;
     237             :     bool        publish_insert;
     238             :     bool        publish_update;
     239             :     bool        publish_delete;
     240             :     ObjectAddress obj;
     241             : 
     242           3 :     parse_publication_options(stmt->options,
     243             :                               &publish_given, &publish_insert,
     244             :                               &publish_update, &publish_delete);
     245             : 
     246             :     /* Everything ok, form a new tuple. */
     247           3 :     memset(values, 0, sizeof(values));
     248           3 :     memset(nulls, false, sizeof(nulls));
     249           3 :     memset(replaces, false, sizeof(replaces));
     250             : 
     251           3 :     if (publish_given)
     252             :     {
     253           3 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
     254           3 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
     255             : 
     256           3 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
     257           3 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
     258             : 
     259           3 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
     260           3 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
     261             :     }
     262             : 
     263           3 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     264             :                             replaces);
     265             : 
     266             :     /* Update the catalog. */
     267           3 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     268             : 
     269           3 :     CommandCounterIncrement();
     270             : 
     271             :     /* Invalidate the relcache. */
     272           3 :     if (((Form_pg_publication) GETSTRUCT(tup))->puballtables)
     273             :     {
     274           1 :         CacheInvalidateRelcacheAll();
     275             :     }
     276             :     else
     277             :     {
     278           2 :         List       *relids = GetPublicationRelations(HeapTupleGetOid(tup));
     279             : 
     280             :         /*
     281             :          * We don't want to send too many individual messages, at some point
     282             :          * it's cheaper to just reset whole relcache.
     283             :          */
     284           2 :         if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
     285             :         {
     286             :             ListCell   *lc;
     287             : 
     288           2 :             foreach(lc, relids)
     289             :             {
     290           0 :                 Oid         relid = lfirst_oid(lc);
     291             : 
     292           0 :                 CacheInvalidateRelcacheByRelid(relid);
     293             :             }
     294             :         }
     295             :         else
     296           0 :             CacheInvalidateRelcacheAll();
     297             :     }
     298             : 
     299           3 :     ObjectAddressSet(obj, PublicationRelationId, HeapTupleGetOid(tup));
     300           3 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
     301             :                                      (Node *) stmt);
     302             : 
     303           3 :     InvokeObjectPostAlterHook(PublicationRelationId, HeapTupleGetOid(tup), 0);
     304           3 : }
     305             : 
     306             : /*
     307             :  * Add or remove table to/from publication.
     308             :  */
     309             : static void
     310          14 : AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
     311             :                        HeapTuple tup)
     312             : {
     313          14 :     Oid         pubid = HeapTupleGetOid(tup);
     314          14 :     List       *rels = NIL;
     315          14 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
     316             : 
     317             :     /* Check that user is allowed to manipulate the publication tables. */
     318          14 :     if (pubform->puballtables)
     319           3 :         ereport(ERROR,
     320             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     321             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
     322             :                         NameStr(pubform->pubname)),
     323             :                  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
     324             : 
     325          11 :     Assert(list_length(stmt->tables) > 0);
     326             : 
     327          11 :     rels = OpenTableList(stmt->tables);
     328             : 
     329          11 :     if (stmt->tableAction == DEFELEM_ADD)
     330           8 :         PublicationAddTables(pubid, rels, false, stmt);
     331           3 :     else if (stmt->tableAction == DEFELEM_DROP)
     332           2 :         PublicationDropTables(pubid, rels, false);
     333             :     else                        /* DEFELEM_SET */
     334             :     {
     335           1 :         List       *oldrelids = GetPublicationRelations(pubid);
     336           1 :         List       *delrels = NIL;
     337             :         ListCell   *oldlc;
     338             : 
     339             :         /* Calculate which relations to drop. */
     340           2 :         foreach(oldlc, oldrelids)
     341             :         {
     342           1 :             Oid         oldrelid = lfirst_oid(oldlc);
     343             :             ListCell   *newlc;
     344           1 :             bool        found = false;
     345             : 
     346           1 :             foreach(newlc, rels)
     347             :             {
     348           1 :                 Relation    newrel = (Relation) lfirst(newlc);
     349             : 
     350           1 :                 if (RelationGetRelid(newrel) == oldrelid)
     351             :                 {
     352           1 :                     found = true;
     353           1 :                     break;
     354             :                 }
     355             :             }
     356             : 
     357           1 :             if (!found)
     358             :             {
     359           0 :                 Relation    oldrel = heap_open(oldrelid,
     360             :                                                ShareUpdateExclusiveLock);
     361             : 
     362           0 :                 delrels = lappend(delrels, oldrel);
     363             :             }
     364             :         }
     365             : 
     366             :         /* And drop them. */
     367           1 :         PublicationDropTables(pubid, delrels, true);
     368             : 
     369             :         /*
     370             :          * Don't bother calculating the difference for adding, we'll catch and
     371             :          * skip existing ones when doing catalog update.
     372             :          */
     373           1 :         PublicationAddTables(pubid, rels, true, stmt);
     374             : 
     375           1 :         CloseTableList(delrels);
     376             :     }
     377             : 
     378           6 :     CloseTableList(rels);
     379           6 : }
     380             : 
     381             : /*
     382             :  * Alter the existing publication.
     383             :  *
     384             :  * This is dispatcher function for AlterPublicationOptions and
     385             :  * AlterPublicationTables.
     386             :  */
     387             : void
     388          17 : AlterPublication(AlterPublicationStmt *stmt)
     389             : {
     390             :     Relation    rel;
     391             :     HeapTuple   tup;
     392             : 
     393          17 :     rel = heap_open(PublicationRelationId, RowExclusiveLock);
     394             : 
     395          17 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
     396             :                               CStringGetDatum(stmt->pubname));
     397             : 
     398          17 :     if (!HeapTupleIsValid(tup))
     399           0 :         ereport(ERROR,
     400             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     401             :                  errmsg("publication \"%s\" does not exist",
     402             :                         stmt->pubname)));
     403             : 
     404             :     /* must be owner */
     405          17 :     if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
     406           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION,
     407           0 :                        stmt->pubname);
     408             : 
     409          17 :     if (stmt->options)
     410           3 :         AlterPublicationOptions(stmt, rel, tup);
     411             :     else
     412          14 :         AlterPublicationTables(stmt, rel, tup);
     413             : 
     414             :     /* Cleanup. */
     415           9 :     heap_freetuple(tup);
     416           9 :     heap_close(rel, RowExclusiveLock);
     417           9 : }
     418             : 
     419             : /*
     420             :  * Drop publication by OID
     421             :  */
     422             : void
     423           8 : RemovePublicationById(Oid pubid)
     424             : {
     425             :     Relation    rel;
     426             :     HeapTuple   tup;
     427             : 
     428           8 :     rel = heap_open(PublicationRelationId, RowExclusiveLock);
     429             : 
     430           8 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     431             : 
     432           8 :     if (!HeapTupleIsValid(tup))
     433           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
     434             : 
     435           8 :     CatalogTupleDelete(rel, &tup->t_self);
     436             : 
     437           8 :     ReleaseSysCache(tup);
     438             : 
     439           8 :     heap_close(rel, RowExclusiveLock);
     440           8 : }
     441             : 
     442             : /*
     443             :  * Remove relation from publication by mapping OID.
     444             :  */
     445             : void
     446          11 : RemovePublicationRelById(Oid proid)
     447             : {
     448             :     Relation    rel;
     449             :     HeapTuple   tup;
     450             :     Form_pg_publication_rel pubrel;
     451             : 
     452          11 :     rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
     453             : 
     454          11 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
     455             : 
     456          11 :     if (!HeapTupleIsValid(tup))
     457           0 :         elog(ERROR, "cache lookup failed for publication table %u",
     458             :              proid);
     459             : 
     460          11 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     461             : 
     462             :     /* Invalidate relcache so that publication info is rebuilt. */
     463          11 :     CacheInvalidateRelcacheByRelid(pubrel->prrelid);
     464             : 
     465          11 :     CatalogTupleDelete(rel, &tup->t_self);
     466             : 
     467          11 :     ReleaseSysCache(tup);
     468             : 
     469          11 :     heap_close(rel, RowExclusiveLock);
     470          11 : }
     471             : 
     472             : /*
     473             :  * Open relations based on provided by RangeVar list.
     474             :  * The returned tables are locked in ShareUpdateExclusiveLock mode.
     475             :  */
     476             : static List *
     477          16 : OpenTableList(List *tables)
     478             : {
     479          16 :     List       *relids = NIL;
     480          16 :     List       *rels = NIL;
     481             :     ListCell   *lc;
     482             : 
     483             :     /*
     484             :      * Open, share-lock, and check all the explicitly-specified relations
     485             :      */
     486          35 :     foreach(lc, tables)
     487             :     {
     488          19 :         RangeVar   *rv = lfirst(lc);
     489             :         Relation    rel;
     490          19 :         bool        recurse = rv->inh;
     491             :         Oid         myrelid;
     492             : 
     493          19 :         CHECK_FOR_INTERRUPTS();
     494             : 
     495          19 :         rel = heap_openrv(rv, ShareUpdateExclusiveLock);
     496          19 :         myrelid = RelationGetRelid(rel);
     497             : 
     498             :         /*
     499             :          * Filter out duplicates if user specifies "foo, foo".
     500             :          *
     501             :          * Note that this algorithm is known to not be very efficient (O(N^2))
     502             :          * but given that it only works on list of tables given to us by user
     503             :          * it's deemed acceptable.
     504             :          */
     505          19 :         if (list_member_oid(relids, myrelid))
     506             :         {
     507           0 :             heap_close(rel, ShareUpdateExclusiveLock);
     508           0 :             continue;
     509             :         }
     510          19 :         rels = lappend(rels, rel);
     511          19 :         relids = lappend_oid(relids, myrelid);
     512             : 
     513          19 :         if (recurse)
     514             :         {
     515             :             ListCell   *child;
     516             :             List       *children;
     517             : 
     518          18 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
     519             :                                            NULL);
     520             : 
     521          37 :             foreach(child, children)
     522             :             {
     523          19 :                 Oid         childrelid = lfirst_oid(child);
     524             : 
     525          19 :                 if (list_member_oid(relids, childrelid))
     526          18 :                     continue;
     527             : 
     528             :                 /*
     529             :                  * Skip duplicates if user specified both parent and child
     530             :                  * tables.
     531             :                  */
     532           1 :                 if (list_member_oid(relids, childrelid))
     533             :                 {
     534           0 :                     heap_close(rel, ShareUpdateExclusiveLock);
     535           0 :                     continue;
     536             :                 }
     537             : 
     538             :                 /* find_all_inheritors already got lock */
     539           1 :                 rel = heap_open(childrelid, NoLock);
     540           1 :                 rels = lappend(rels, rel);
     541           1 :                 relids = lappend_oid(relids, childrelid);
     542             :             }
     543             :         }
     544             :     }
     545             : 
     546          16 :     list_free(relids);
     547             : 
     548          16 :     return rels;
     549             : }
     550             : 
     551             : /*
     552             :  * Close all relations in the list.
     553             :  */
     554             : static void
     555          11 : CloseTableList(List *rels)
     556             : {
     557             :     ListCell   *lc;
     558             : 
     559          25 :     foreach(lc, rels)
     560             :     {
     561          14 :         Relation    rel = (Relation) lfirst(lc);
     562             : 
     563          14 :         heap_close(rel, NoLock);
     564             :     }
     565          11 : }
     566             : 
     567             : /*
     568             :  * Add listed tables to the publication.
     569             :  */
     570             : static void
     571          14 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
     572             :                      AlterPublicationStmt *stmt)
     573             : {
     574             :     ListCell   *lc;
     575             : 
     576          14 :     Assert(!stmt || !stmt->for_all_tables);
     577             : 
     578          26 :     foreach(lc, rels)
     579             :     {
     580          17 :         Relation    rel = (Relation) lfirst(lc);
     581             :         ObjectAddress obj;
     582             : 
     583             :         /* Must be owner of the table or superuser. */
     584          17 :         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
     585           1 :             aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
     586           1 :                            RelationGetRelationName(rel));
     587             : 
     588          16 :         obj = publication_add_relation(pubid, rel, if_not_exists);
     589          12 :         if (stmt)
     590             :         {
     591           6 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
     592             :                                              (Node *) stmt);
     593             : 
     594           6 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
     595             :                                        obj.objectId, 0);
     596             :         }
     597             :     }
     598           9 : }
     599             : 
     600             : /*
     601             :  * Remove listed tables from the publication.
     602             :  */
     603             : static void
     604           3 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
     605             : {
     606             :     ObjectAddress obj;
     607             :     ListCell   *lc;
     608             :     Oid         prid;
     609             : 
     610           5 :     foreach(lc, rels)
     611             :     {
     612           3 :         Relation    rel = (Relation) lfirst(lc);
     613           3 :         Oid         relid = RelationGetRelid(rel);
     614             : 
     615           3 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     616             :                                ObjectIdGetDatum(pubid));
     617           3 :         if (!OidIsValid(prid))
     618             :         {
     619           1 :             if (missing_ok)
     620           0 :                 continue;
     621             : 
     622           1 :             ereport(ERROR,
     623             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     624             :                      errmsg("relation \"%s\" is not part of the publication",
     625             :                             RelationGetRelationName(rel))));
     626             :         }
     627             : 
     628           2 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
     629           2 :         performDeletion(&obj, DROP_CASCADE, 0);
     630             :     }
     631           2 : }
     632             : 
     633             : /*
     634             :  * Internal workhorse for changing a publication owner
     635             :  */
     636             : static void
     637           1 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
     638             : {
     639             :     Form_pg_publication form;
     640             : 
     641           1 :     form = (Form_pg_publication) GETSTRUCT(tup);
     642             : 
     643           1 :     if (form->pubowner == newOwnerId)
     644           1 :         return;
     645             : 
     646           1 :     if (!superuser())
     647             :     {
     648             :         AclResult   aclresult;
     649             : 
     650             :         /* Must be owner */
     651           0 :         if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
     652           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION,
     653           0 :                            NameStr(form->pubname));
     654             : 
     655             :         /* Must be able to become new owner */
     656           0 :         check_is_member_of_role(GetUserId(), newOwnerId);
     657             : 
     658             :         /* New owner must have CREATE privilege on database */
     659           0 :         aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
     660           0 :         if (aclresult != ACLCHECK_OK)
     661           0 :             aclcheck_error(aclresult, ACL_KIND_DATABASE,
     662           0 :                            get_database_name(MyDatabaseId));
     663             : 
     664           0 :         if (form->puballtables && !superuser_arg(newOwnerId))
     665           0 :             ereport(ERROR,
     666             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     667             :                      errmsg("permission denied to change owner of publication \"%s\"",
     668             :                             NameStr(form->pubname)),
     669             :                      errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
     670             :     }
     671             : 
     672           1 :     form->pubowner = newOwnerId;
     673           1 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     674             : 
     675             :     /* Update owner dependency reference */
     676           2 :     changeDependencyOnOwner(PublicationRelationId,
     677           2 :                             HeapTupleGetOid(tup),
     678             :                             newOwnerId);
     679             : 
     680           1 :     InvokeObjectPostAlterHook(PublicationRelationId,
     681             :                               HeapTupleGetOid(tup), 0);
     682             : }
     683             : 
     684             : /*
     685             :  * Change publication owner -- by name
     686             :  */
     687             : ObjectAddress
     688           1 : AlterPublicationOwner(const char *name, Oid newOwnerId)
     689             : {
     690             :     Oid         subid;
     691             :     HeapTuple   tup;
     692             :     Relation    rel;
     693             :     ObjectAddress address;
     694             : 
     695           1 :     rel = heap_open(PublicationRelationId, RowExclusiveLock);
     696             : 
     697           1 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
     698             : 
     699           1 :     if (!HeapTupleIsValid(tup))
     700           0 :         ereport(ERROR,
     701             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     702             :                  errmsg("publication \"%s\" does not exist", name)));
     703             : 
     704           1 :     subid = HeapTupleGetOid(tup);
     705             : 
     706           1 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
     707             : 
     708           1 :     ObjectAddressSet(address, PublicationRelationId, subid);
     709             : 
     710           1 :     heap_freetuple(tup);
     711             : 
     712           1 :     heap_close(rel, RowExclusiveLock);
     713             : 
     714           1 :     return address;
     715             : }
     716             : 
     717             : /*
     718             :  * Change publication owner -- by OID
     719             :  */
     720             : void
     721           0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
     722             : {
     723             :     HeapTuple   tup;
     724             :     Relation    rel;
     725             : 
     726           0 :     rel = heap_open(PublicationRelationId, RowExclusiveLock);
     727             : 
     728           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
     729             : 
     730           0 :     if (!HeapTupleIsValid(tup))
     731           0 :         ereport(ERROR,
     732             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     733             :                  errmsg("publication with OID %u does not exist", subid)));
     734             : 
     735           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
     736             : 
     737           0 :     heap_freetuple(tup);
     738             : 
     739           0 :     heap_close(rel, RowExclusiveLock);
     740           0 : }

Generated by: LCOV version 1.11