LCOV - code coverage report
Current view: top level - src/backend/commands - subscriptioncmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 272 414 65.7 %
Date: 2017-09-29 15:12:54 Functions: 7 10 70.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * subscriptioncmds.c
       4             :  *      subscription catalog manipulation functions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      subscriptioncmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "miscadmin.h"
      18             : 
      19             : #include "access/heapam.h"
      20             : #include "access/htup_details.h"
      21             : #include "access/xact.h"
      22             : 
      23             : #include "catalog/dependency.h"
      24             : #include "catalog/indexing.h"
      25             : #include "catalog/namespace.h"
      26             : #include "catalog/objectaccess.h"
      27             : #include "catalog/objectaddress.h"
      28             : #include "catalog/pg_type.h"
      29             : #include "catalog/pg_subscription.h"
      30             : #include "catalog/pg_subscription_rel.h"
      31             : 
      32             : #include "commands/defrem.h"
      33             : #include "commands/event_trigger.h"
      34             : #include "commands/subscriptioncmds.h"
      35             : 
      36             : #include "executor/executor.h"
      37             : 
      38             : #include "nodes/makefuncs.h"
      39             : 
      40             : #include "replication/logicallauncher.h"
      41             : #include "replication/origin.h"
      42             : #include "replication/walreceiver.h"
      43             : #include "replication/walsender.h"
      44             : #include "replication/worker_internal.h"
      45             : 
      46             : #include "storage/lmgr.h"
      47             : 
      48             : #include "utils/builtins.h"
      49             : #include "utils/guc.h"
      50             : #include "utils/lsyscache.h"
      51             : #include "utils/memutils.h"
      52             : #include "utils/syscache.h"
      53             : 
      54             : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
      55             : 
      56             : /*
      57             :  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
      58             :  *
      59             :  * Since not all options can be specified in both commands, this function
      60             :  * will report an error on options if the target output pointer is NULL to
      61             :  * accommodate that.
      62             :  */
      63             : static void
      64          25 : parse_subscription_options(List *options, bool *connect, bool *enabled_given,
      65             :                            bool *enabled, bool *create_slot,
      66             :                            bool *slot_name_given, char **slot_name,
      67             :                            bool *copy_data, char **synchronous_commit,
      68             :                            bool *refresh)
      69             : {
      70             :     ListCell   *lc;
      71          25 :     bool        connect_given = false;
      72          25 :     bool        create_slot_given = false;
      73          25 :     bool        copy_data_given = false;
      74          25 :     bool        refresh_given = false;
      75             : 
      76             :     /* If connect is specified, the others also need to be. */
      77          25 :     Assert(!connect || (enabled && create_slot && copy_data));
      78             : 
      79          25 :     if (connect)
      80          16 :         *connect = true;
      81          25 :     if (enabled)
      82             :     {
      83          19 :         *enabled_given = false;
      84          19 :         *enabled = true;
      85             :     }
      86          25 :     if (create_slot)
      87          16 :         *create_slot = true;
      88          25 :     if (slot_name)
      89             :     {
      90          21 :         *slot_name_given = false;
      91          21 :         *slot_name = NULL;
      92             :     }
      93          25 :     if (copy_data)
      94          17 :         *copy_data = true;
      95          25 :     if (synchronous_commit)
      96          21 :         *synchronous_commit = NULL;
      97          25 :     if (refresh)
      98           1 :         *refresh = true;
      99             : 
     100             :     /* Parse options */
     101          56 :     foreach(lc, options)
     102             :     {
     103          33 :         DefElem    *defel = (DefElem *) lfirst(lc);
     104             : 
     105          33 :         if (strcmp(defel->defname, "connect") == 0 && connect)
     106             :         {
     107           9 :             if (connect_given)
     108           0 :                 ereport(ERROR,
     109             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     110             :                          errmsg("conflicting or redundant options")));
     111             : 
     112           9 :             connect_given = true;
     113           9 :             *connect = defGetBoolean(defel);
     114             :         }
     115          24 :         else if (strcmp(defel->defname, "enabled") == 0 && enabled)
     116             :         {
     117           6 :             if (*enabled_given)
     118           0 :                 ereport(ERROR,
     119             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     120             :                          errmsg("conflicting or redundant options")));
     121             : 
     122           6 :             *enabled_given = true;
     123           6 :             *enabled = defGetBoolean(defel);
     124             :         }
     125          18 :         else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
     126             :         {
     127           4 :             if (create_slot_given)
     128           0 :                 ereport(ERROR,
     129             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     130             :                          errmsg("conflicting or redundant options")));
     131             : 
     132           4 :             create_slot_given = true;
     133           4 :             *create_slot = defGetBoolean(defel);
     134             :         }
     135          14 :         else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
     136             :         {
     137           9 :             if (*slot_name_given)
     138           0 :                 ereport(ERROR,
     139             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     140             :                          errmsg("conflicting or redundant options")));
     141             : 
     142           9 :             *slot_name_given = true;
     143           9 :             *slot_name = defGetString(defel);
     144             : 
     145             :             /* Setting slot_name = NONE is treated as no slot name. */
     146          18 :             if (strcmp(*slot_name, "none") == 0)
     147           8 :                 *slot_name = NULL;
     148             :         }
     149           5 :         else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
     150             :         {
     151           1 :             if (copy_data_given)
     152           0 :                 ereport(ERROR,
     153             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     154             :                          errmsg("conflicting or redundant options")));
     155             : 
     156           1 :             copy_data_given = true;
     157           1 :             *copy_data = defGetBoolean(defel);
     158             :         }
     159           4 :         else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
     160             :                  synchronous_commit)
     161             :         {
     162           2 :             if (*synchronous_commit)
     163           0 :                 ereport(ERROR,
     164             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     165             :                          errmsg("conflicting or redundant options")));
     166             : 
     167           2 :             *synchronous_commit = defGetString(defel);
     168             : 
     169             :             /* Test if the given value is valid for synchronous_commit GUC. */
     170           2 :             (void) set_config_option("synchronous_commit", *synchronous_commit,
     171             :                                      PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
     172             :                                      false, 0, false);
     173             :         }
     174           2 :         else if (strcmp(defel->defname, "refresh") == 0 && refresh)
     175             :         {
     176           1 :             if (refresh_given)
     177           0 :                 ereport(ERROR,
     178             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     179             :                          errmsg("conflicting or redundant options")));
     180             : 
     181           1 :             refresh_given = true;
     182           1 :             *refresh = defGetBoolean(defel);
     183             :         }
     184             :         else
     185           1 :             ereport(ERROR,
     186             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     187             :                      errmsg("unrecognized subscription parameter: %s", defel->defname)));
     188             :     }
     189             : 
     190             :     /*
     191             :      * We've been explicitly asked to not connect, that requires some
     192             :      * additional processing.
     193             :      */
     194          23 :     if (connect && !*connect)
     195             :     {
     196             :         /* Check for incompatible options from the user. */
     197           9 :         if (enabled && *enabled_given && *enabled)
     198           1 :             ereport(ERROR,
     199             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     200             :                      errmsg("connect = false and enabled = true are mutually exclusive options")));
     201             : 
     202           8 :         if (create_slot && create_slot_given && *create_slot)
     203           1 :             ereport(ERROR,
     204             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     205             :                      errmsg("connect = false and create_slot = true are mutually exclusive options")));
     206             : 
     207           7 :         if (copy_data && copy_data_given && *copy_data)
     208           1 :             ereport(ERROR,
     209             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     210             :                      errmsg("connect = false and copy_data = true are mutually exclusive options")));
     211             : 
     212             :         /* Change the defaults of other options. */
     213           6 :         *enabled = false;
     214           6 :         *create_slot = false;
     215           6 :         *copy_data = false;
     216             :     }
     217             : 
     218             :     /*
     219             :      * Do additional checking for disallowed combination when slot_name = NONE
     220             :      * was used.
     221             :      */
     222          20 :     if (slot_name && *slot_name_given && !*slot_name)
     223             :     {
     224           8 :         if (enabled && *enabled_given && *enabled)
     225           1 :             ereport(ERROR,
     226             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     227             :                      errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
     228             : 
     229           7 :         if (create_slot && create_slot_given && *create_slot)
     230           1 :             ereport(ERROR,
     231             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     232             :                      errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
     233             : 
     234           6 :         if (enabled && !*enabled_given && *enabled)
     235           2 :             ereport(ERROR,
     236             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     237             :                      errmsg("subscription with slot_name = NONE must also set enabled = false")));
     238             : 
     239           4 :         if (create_slot && !create_slot_given && *create_slot)
     240           1 :             ereport(ERROR,
     241             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     242             :                      errmsg("subscription with slot_name = NONE must also set create_slot = false")));
     243             :     }
     244          15 : }
     245             : 
     246             : /*
     247             :  * Auxiliary function to return a text array out of a list of String nodes.
     248             :  */
     249             : static Datum
     250           5 : publicationListToArray(List *publist)
     251             : {
     252             :     ArrayType  *arr;
     253             :     Datum      *datums;
     254           5 :     int         j = 0;
     255             :     ListCell   *cell;
     256             :     MemoryContext memcxt;
     257             :     MemoryContext oldcxt;
     258             : 
     259             :     /* Create memory context for temporary allocations. */
     260           5 :     memcxt = AllocSetContextCreate(CurrentMemoryContext,
     261             :                                    "publicationListToArray to array",
     262             :                                    ALLOCSET_DEFAULT_MINSIZE,
     263             :                                    ALLOCSET_DEFAULT_INITSIZE,
     264             :                                    ALLOCSET_DEFAULT_MAXSIZE);
     265           5 :     oldcxt = MemoryContextSwitchTo(memcxt);
     266             : 
     267           5 :     datums = palloc(sizeof(text *) * list_length(publist));
     268          12 :     foreach(cell, publist)
     269             :     {
     270           8 :         char       *name = strVal(lfirst(cell));
     271             :         ListCell   *pcell;
     272             : 
     273             :         /* Check for duplicates. */
     274          10 :         foreach(pcell, publist)
     275             :         {
     276          10 :             char       *pname = strVal(lfirst(pcell));
     277             : 
     278          10 :             if (name == pname)
     279           7 :                 break;
     280             : 
     281           3 :             if (strcmp(name, pname) == 0)
     282           1 :                 ereport(ERROR,
     283             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     284             :                          errmsg("publication name \"%s\" used more than once",
     285             :                                 pname)));
     286             :         }
     287             : 
     288           7 :         datums[j++] = CStringGetTextDatum(name);
     289             :     }
     290             : 
     291           4 :     MemoryContextSwitchTo(oldcxt);
     292             : 
     293           4 :     arr = construct_array(datums, list_length(publist),
     294             :                           TEXTOID, -1, false, 'i');
     295           4 :     MemoryContextDelete(memcxt);
     296             : 
     297           4 :     return PointerGetDatum(arr);
     298             : }
     299             : 
     300             : /*
     301             :  * Create new subscription.
     302             :  */
     303             : ObjectAddress
     304          16 : CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
     305             : {
     306             :     Relation    rel;
     307             :     ObjectAddress myself;
     308             :     Oid         subid;
     309             :     bool        nulls[Natts_pg_subscription];
     310             :     Datum       values[Natts_pg_subscription];
     311          16 :     Oid         owner = GetUserId();
     312             :     HeapTuple   tup;
     313             :     bool        connect;
     314             :     bool        enabled_given;
     315             :     bool        enabled;
     316             :     bool        copy_data;
     317             :     char       *synchronous_commit;
     318             :     char       *conninfo;
     319             :     char       *slotname;
     320             :     bool        slotname_given;
     321             :     char        originname[NAMEDATALEN];
     322             :     bool        create_slot;
     323             :     List       *publications;
     324             : 
     325             :     /*
     326             :      * Parse and check options.
     327             :      *
     328             :      * Connection and publication should not be specified here.
     329             :      */
     330          16 :     parse_subscription_options(stmt->options, &connect, &enabled_given,
     331             :                                &enabled, &create_slot, &slotname_given,
     332             :                                &slotname, &copy_data, &synchronous_commit,
     333             :                                NULL);
     334             : 
     335             :     /*
     336             :      * Since creating a replication slot is not transactional, rolling back
     337             :      * the transaction leaves the created replication slot.  So we cannot run
     338             :      * CREATE SUBSCRIPTION inside a transaction block if creating a
     339             :      * replication slot.
     340             :      */
     341           8 :     if (create_slot)
     342           2 :         PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
     343             : 
     344           7 :     if (!superuser())
     345           1 :         ereport(ERROR,
     346             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     347             :                  (errmsg("must be superuser to create subscriptions"))));
     348             : 
     349           6 :     rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
     350             : 
     351             :     /* Check if name is used */
     352           6 :     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
     353             :                             CStringGetDatum(stmt->subname));
     354           6 :     if (OidIsValid(subid))
     355             :     {
     356           1 :         ereport(ERROR,
     357             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     358             :                  errmsg("subscription \"%s\" already exists",
     359             :                         stmt->subname)));
     360             :     }
     361             : 
     362           5 :     if (!slotname_given && slotname == NULL)
     363           3 :         slotname = stmt->subname;
     364             : 
     365             :     /* The default for synchronous_commit of subscriptions is off. */
     366           5 :     if (synchronous_commit == NULL)
     367           5 :         synchronous_commit = "off";
     368             : 
     369           5 :     conninfo = stmt->conninfo;
     370           5 :     publications = stmt->publication;
     371             : 
     372             :     /* Load the library providing us libpq calls. */
     373           5 :     load_file("libpqwalreceiver", false);
     374             : 
     375             :     /* Check the connection info string. */
     376           5 :     walrcv_check_conninfo(conninfo);
     377             : 
     378             :     /* Everything ok, form a new tuple. */
     379           4 :     memset(values, 0, sizeof(values));
     380           4 :     memset(nulls, false, sizeof(nulls));
     381             : 
     382           4 :     values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
     383           4 :     values[Anum_pg_subscription_subname - 1] =
     384           4 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
     385           4 :     values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
     386           4 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
     387           4 :     values[Anum_pg_subscription_subconninfo - 1] =
     388           4 :         CStringGetTextDatum(conninfo);
     389           4 :     if (slotname)
     390           2 :         values[Anum_pg_subscription_subslotname - 1] =
     391           2 :             DirectFunctionCall1(namein, CStringGetDatum(slotname));
     392             :     else
     393           2 :         nulls[Anum_pg_subscription_subslotname - 1] = true;
     394           4 :     values[Anum_pg_subscription_subsynccommit - 1] =
     395           4 :         CStringGetTextDatum(synchronous_commit);
     396           3 :     values[Anum_pg_subscription_subpublications - 1] =
     397           4 :         publicationListToArray(publications);
     398             : 
     399           3 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     400             : 
     401             :     /* Insert tuple into catalog. */
     402           3 :     subid = CatalogTupleInsert(rel, tup);
     403           3 :     heap_freetuple(tup);
     404             : 
     405           3 :     recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
     406             : 
     407           3 :     snprintf(originname, sizeof(originname), "pg_%u", subid);
     408           3 :     replorigin_create(originname);
     409             : 
     410             :     /*
     411             :      * Connect to remote side to execute requested commands and fetch table
     412             :      * info.
     413             :      */
     414           3 :     if (connect)
     415             :     {
     416             :         XLogRecPtr  lsn;
     417             :         char       *err;
     418             :         WalReceiverConn *wrconn;
     419             :         List       *tables;
     420             :         ListCell   *lc;
     421             :         char        table_state;
     422             : 
     423             :         /* Try to connect to the publisher. */
     424           0 :         wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
     425           0 :         if (!wrconn)
     426           0 :             ereport(ERROR,
     427             :                     (errmsg("could not connect to the publisher: %s", err)));
     428             : 
     429           0 :         PG_TRY();
     430             :         {
     431             :             /*
     432             :              * Set sync state based on if we were asked to do data copy or
     433             :              * not.
     434             :              */
     435           0 :             table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
     436             : 
     437             :             /*
     438             :              * Get the table list from publisher and build local table status
     439             :              * info.
     440             :              */
     441           0 :             tables = fetch_table_list(wrconn, publications);
     442           0 :             foreach(lc, tables)
     443             :             {
     444           0 :                 RangeVar   *rv = (RangeVar *) lfirst(lc);
     445             :                 Oid         relid;
     446             : 
     447           0 :                 relid = RangeVarGetRelid(rv, AccessShareLock, false);
     448             : 
     449             :                 /* Check for supported relkind. */
     450           0 :                 CheckSubscriptionRelkind(get_rel_relkind(relid),
     451           0 :                                          rv->schemaname, rv->relname);
     452             : 
     453           0 :                 SetSubscriptionRelState(subid, relid, table_state,
     454             :                                         InvalidXLogRecPtr, false);
     455             :             }
     456             : 
     457             :             /*
     458             :              * If requested, create permanent slot for the subscription. We
     459             :              * won't use the initial snapshot for anything, so no need to
     460             :              * export it.
     461             :              */
     462           0 :             if (create_slot)
     463             :             {
     464           0 :                 Assert(slotname);
     465             : 
     466           0 :                 walrcv_create_slot(wrconn, slotname, false,
     467             :                                    CRS_NOEXPORT_SNAPSHOT, &lsn);
     468           0 :                 ereport(NOTICE,
     469             :                         (errmsg("created replication slot \"%s\" on publisher",
     470             :                                 slotname)));
     471             :             }
     472             :         }
     473           0 :         PG_CATCH();
     474             :         {
     475             :             /* Close the connection in case of failure. */
     476           0 :             walrcv_disconnect(wrconn);
     477           0 :             PG_RE_THROW();
     478             :         }
     479           0 :         PG_END_TRY();
     480             : 
     481             :         /* And we are done with the remote side. */
     482           0 :         walrcv_disconnect(wrconn);
     483             :     }
     484             :     else
     485           3 :         ereport(WARNING,
     486             :                 (errmsg("tables were not subscribed, you will have to run "
     487             :                         "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
     488             :                         "subscribe the tables")));
     489             : 
     490           3 :     heap_close(rel, RowExclusiveLock);
     491             : 
     492           3 :     if (enabled)
     493           0 :         ApplyLauncherWakeupAtCommit();
     494             : 
     495           3 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     496             : 
     497           3 :     InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
     498             : 
     499           3 :     return myself;
     500             : }
     501             : 
     502             : static void
     503           0 : AlterSubscription_refresh(Subscription *sub, bool copy_data)
     504             : {
     505             :     char       *err;
     506             :     List       *pubrel_names;
     507             :     List       *subrel_states;
     508             :     Oid        *subrel_local_oids;
     509             :     Oid        *pubrel_local_oids;
     510             :     ListCell   *lc;
     511             :     int         off;
     512             : 
     513             :     /* Load the library providing us libpq calls. */
     514           0 :     load_file("libpqwalreceiver", false);
     515             : 
     516             :     /* Try to connect to the publisher. */
     517           0 :     wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
     518           0 :     if (!wrconn)
     519           0 :         ereport(ERROR,
     520             :                 (errmsg("could not connect to the publisher: %s", err)));
     521             : 
     522             :     /* Get the table list from publisher. */
     523           0 :     pubrel_names = fetch_table_list(wrconn, sub->publications);
     524             : 
     525             :     /* We are done with the remote side, close connection. */
     526           0 :     walrcv_disconnect(wrconn);
     527             : 
     528             :     /* Get local table list. */
     529           0 :     subrel_states = GetSubscriptionRelations(sub->oid);
     530             : 
     531             :     /*
     532             :      * Build qsorted array of local table oids for faster lookup. This can
     533             :      * potentially contain all tables in the database so speed of lookup is
     534             :      * important.
     535             :      */
     536           0 :     subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
     537           0 :     off = 0;
     538           0 :     foreach(lc, subrel_states)
     539             :     {
     540           0 :         SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
     541             : 
     542           0 :         subrel_local_oids[off++] = relstate->relid;
     543             :     }
     544           0 :     qsort(subrel_local_oids, list_length(subrel_states),
     545             :           sizeof(Oid), oid_cmp);
     546             : 
     547             :     /*
     548             :      * Walk over the remote tables and try to match them to locally known
     549             :      * tables. If the table is not known locally create a new state for it.
     550             :      *
     551             :      * Also builds array of local oids of remote tables for the next step.
     552             :      */
     553           0 :     off = 0;
     554           0 :     pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
     555             : 
     556           0 :     foreach(lc, pubrel_names)
     557             :     {
     558           0 :         RangeVar   *rv = (RangeVar *) lfirst(lc);
     559             :         Oid         relid;
     560             : 
     561           0 :         relid = RangeVarGetRelid(rv, AccessShareLock, false);
     562             : 
     563             :         /* Check for supported relkind. */
     564           0 :         CheckSubscriptionRelkind(get_rel_relkind(relid),
     565           0 :                                  rv->schemaname, rv->relname);
     566             : 
     567           0 :         pubrel_local_oids[off++] = relid;
     568             : 
     569           0 :         if (!bsearch(&relid, subrel_local_oids,
     570           0 :                      list_length(subrel_states), sizeof(Oid), oid_cmp))
     571             :         {
     572           0 :             SetSubscriptionRelState(sub->oid, relid,
     573             :                                     copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
     574             :                                     InvalidXLogRecPtr, false);
     575           0 :             ereport(DEBUG1,
     576             :                     (errmsg("table \"%s.%s\" added to subscription \"%s\"",
     577             :                             rv->schemaname, rv->relname, sub->name)));
     578             :         }
     579             :     }
     580             : 
     581             :     /*
     582             :      * Next remove state for tables we should not care about anymore using the
     583             :      * data we collected above
     584             :      */
     585           0 :     qsort(pubrel_local_oids, list_length(pubrel_names),
     586             :           sizeof(Oid), oid_cmp);
     587             : 
     588           0 :     for (off = 0; off < list_length(subrel_states); off++)
     589             :     {
     590           0 :         Oid         relid = subrel_local_oids[off];
     591             : 
     592           0 :         if (!bsearch(&relid, pubrel_local_oids,
     593           0 :                      list_length(pubrel_names), sizeof(Oid), oid_cmp))
     594             :         {
     595           0 :             RemoveSubscriptionRel(sub->oid, relid);
     596             : 
     597           0 :             logicalrep_worker_stop_at_commit(sub->oid, relid);
     598             : 
     599           0 :             ereport(DEBUG1,
     600             :                     (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
     601             :                             get_namespace_name(get_rel_namespace(relid)),
     602             :                             get_rel_name(relid),
     603             :                             sub->name)));
     604             :         }
     605             :     }
     606           0 : }
     607             : 
     608             : /*
     609             :  * Alter the existing subscription.
     610             :  */
     611             : ObjectAddress
     612          13 : AlterSubscription(AlterSubscriptionStmt *stmt)
     613             : {
     614             :     Relation    rel;
     615             :     ObjectAddress myself;
     616             :     bool        nulls[Natts_pg_subscription];
     617             :     bool        replaces[Natts_pg_subscription];
     618             :     Datum       values[Natts_pg_subscription];
     619             :     HeapTuple   tup;
     620             :     Oid         subid;
     621          13 :     bool        update_tuple = false;
     622             :     Subscription *sub;
     623             : 
     624          13 :     rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
     625             : 
     626             :     /* Fetch the existing tuple. */
     627          13 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
     628             :                               CStringGetDatum(stmt->subname));
     629             : 
     630          13 :     if (!HeapTupleIsValid(tup))
     631           1 :         ereport(ERROR,
     632             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     633             :                  errmsg("subscription \"%s\" does not exist",
     634             :                         stmt->subname)));
     635             : 
     636             :     /* must be owner */
     637          12 :     if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
     638           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
     639           0 :                        stmt->subname);
     640             : 
     641          12 :     subid = HeapTupleGetOid(tup);
     642          12 :     sub = GetSubscription(subid, false);
     643             : 
     644             :     /* Lock the subscription so nobody else can do anything with it. */
     645          12 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
     646             : 
     647             :     /* Form a new tuple. */
     648          12 :     memset(values, 0, sizeof(values));
     649          12 :     memset(nulls, false, sizeof(nulls));
     650          12 :     memset(replaces, false, sizeof(replaces));
     651             : 
     652          12 :     switch (stmt->kind)
     653             :     {
     654             :         case ALTER_SUBSCRIPTION_OPTIONS:
     655             :             {
     656             :                 char       *slotname;
     657             :                 bool        slotname_given;
     658             :                 char       *synchronous_commit;
     659             : 
     660           5 :                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
     661             :                                            NULL, &slotname_given, &slotname,
     662             :                                            NULL, &synchronous_commit, NULL);
     663             : 
     664           3 :                 if (slotname_given)
     665             :                 {
     666           2 :                     if (sub->enabled && !slotname)
     667           0 :                         ereport(ERROR,
     668             :                                 (errcode(ERRCODE_SYNTAX_ERROR),
     669             :                                  errmsg("cannot set slot_name = NONE for enabled subscription")));
     670             : 
     671           2 :                     if (slotname)
     672           1 :                         values[Anum_pg_subscription_subslotname - 1] =
     673           1 :                             DirectFunctionCall1(namein, CStringGetDatum(slotname));
     674             :                     else
     675           1 :                         nulls[Anum_pg_subscription_subslotname - 1] = true;
     676           2 :                     replaces[Anum_pg_subscription_subslotname - 1] = true;
     677             :                 }
     678             : 
     679           3 :                 if (synchronous_commit)
     680             :                 {
     681           1 :                     values[Anum_pg_subscription_subsynccommit - 1] =
     682           1 :                         CStringGetTextDatum(synchronous_commit);
     683           1 :                     replaces[Anum_pg_subscription_subsynccommit - 1] = true;
     684             :                 }
     685             : 
     686           3 :                 update_tuple = true;
     687           3 :                 break;
     688             :             }
     689             : 
     690             :         case ALTER_SUBSCRIPTION_ENABLED:
     691             :             {
     692             :                 bool        enabled,
     693             :                             enabled_given;
     694             : 
     695           3 :                 parse_subscription_options(stmt->options, NULL,
     696             :                                            &enabled_given, &enabled, NULL,
     697             :                                            NULL, NULL, NULL, NULL, NULL);
     698           3 :                 Assert(enabled_given);
     699             : 
     700           3 :                 if (!sub->slotname && enabled)
     701           1 :                     ereport(ERROR,
     702             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     703             :                              errmsg("cannot enable subscription that does not have a slot name")));
     704             : 
     705           2 :                 values[Anum_pg_subscription_subenabled - 1] =
     706           2 :                     BoolGetDatum(enabled);
     707           2 :                 replaces[Anum_pg_subscription_subenabled - 1] = true;
     708             : 
     709           2 :                 if (enabled)
     710           1 :                     ApplyLauncherWakeupAtCommit();
     711             : 
     712           2 :                 update_tuple = true;
     713           2 :                 break;
     714             :             }
     715             : 
     716             :         case ALTER_SUBSCRIPTION_CONNECTION:
     717             :             /* Load the library providing us libpq calls. */
     718           2 :             load_file("libpqwalreceiver", false);
     719             :             /* Check the connection info string. */
     720           2 :             walrcv_check_conninfo(stmt->conninfo);
     721             : 
     722           1 :             values[Anum_pg_subscription_subconninfo - 1] =
     723           1 :                 CStringGetTextDatum(stmt->conninfo);
     724           1 :             replaces[Anum_pg_subscription_subconninfo - 1] = true;
     725           1 :             update_tuple = true;
     726           1 :             break;
     727             : 
     728             :         case ALTER_SUBSCRIPTION_PUBLICATION:
     729             :             {
     730             :                 bool        copy_data;
     731             :                 bool        refresh;
     732             : 
     733           1 :                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
     734             :                                            NULL, NULL, NULL, &copy_data,
     735             :                                            NULL, &refresh);
     736             : 
     737           1 :                 values[Anum_pg_subscription_subpublications - 1] =
     738           1 :                     publicationListToArray(stmt->publication);
     739           1 :                 replaces[Anum_pg_subscription_subpublications - 1] = true;
     740             : 
     741           1 :                 update_tuple = true;
     742             : 
     743             :                 /* Refresh if user asked us to. */
     744           1 :                 if (refresh)
     745             :                 {
     746           0 :                     if (!sub->enabled)
     747           0 :                         ereport(ERROR,
     748             :                                 (errcode(ERRCODE_SYNTAX_ERROR),
     749             :                                  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
     750             :                                  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
     751             : 
     752             :                     /* Make sure refresh sees the new list of publications. */
     753           0 :                     sub->publications = stmt->publication;
     754             : 
     755           0 :                     AlterSubscription_refresh(sub, copy_data);
     756             :                 }
     757             : 
     758           1 :                 break;
     759             :             }
     760             : 
     761             :         case ALTER_SUBSCRIPTION_REFRESH:
     762             :             {
     763             :                 bool        copy_data;
     764             : 
     765           1 :                 if (!sub->enabled)
     766           1 :                     ereport(ERROR,
     767             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     768             :                              errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
     769             : 
     770           0 :                 parse_subscription_options(stmt->options, NULL, NULL, NULL,
     771             :                                            NULL, NULL, NULL, &copy_data,
     772             :                                            NULL, NULL);
     773             : 
     774           0 :                 AlterSubscription_refresh(sub, copy_data);
     775             : 
     776           0 :                 break;
     777             :             }
     778             : 
     779             :         default:
     780           0 :             elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
     781             :                  stmt->kind);
     782             :     }
     783             : 
     784             :     /* Update the catalog if needed. */
     785           7 :     if (update_tuple)
     786             :     {
     787           7 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     788             :                                 replaces);
     789             : 
     790           7 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
     791             : 
     792           7 :         heap_freetuple(tup);
     793             :     }
     794             : 
     795           7 :     heap_close(rel, RowExclusiveLock);
     796             : 
     797           7 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     798             : 
     799           7 :     InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
     800             : 
     801           7 :     return myself;
     802             : }
     803             : 
     804             : /*
     805             :  * Drop a subscription
     806             :  */
     807             : void
     808           6 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
     809             : {
     810             :     Relation    rel;
     811             :     ObjectAddress myself;
     812             :     HeapTuple   tup;
     813             :     Oid         subid;
     814             :     Datum       datum;
     815             :     bool        isnull;
     816             :     char       *subname;
     817             :     char       *conninfo;
     818             :     char       *slotname;
     819             :     List       *subworkers;
     820             :     ListCell   *lc;
     821             :     char        originname[NAMEDATALEN];
     822           6 :     char       *err = NULL;
     823             :     RepOriginId originid;
     824           6 :     WalReceiverConn *wrconn = NULL;
     825             :     StringInfoData cmd;
     826             : 
     827             :     /*
     828             :      * Lock pg_subscription with AccessExclusiveLock to ensure that the
     829             :      * launcher doesn't restart new worker during dropping the subscription
     830             :      */
     831           6 :     rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
     832             : 
     833           6 :     tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
     834             :                           CStringGetDatum(stmt->subname));
     835             : 
     836           6 :     if (!HeapTupleIsValid(tup))
     837             :     {
     838           2 :         heap_close(rel, NoLock);
     839             : 
     840           2 :         if (!stmt->missing_ok)
     841           1 :             ereport(ERROR,
     842             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     843             :                      errmsg("subscription \"%s\" does not exist",
     844             :                             stmt->subname)));
     845             :         else
     846           1 :             ereport(NOTICE,
     847             :                     (errmsg("subscription \"%s\" does not exist, skipping",
     848             :                             stmt->subname)));
     849             : 
     850           5 :         return;
     851             :     }
     852             : 
     853           4 :     subid = HeapTupleGetOid(tup);
     854             : 
     855             :     /* must be owner */
     856           4 :     if (!pg_subscription_ownercheck(subid, GetUserId()))
     857           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
     858           0 :                        stmt->subname);
     859             : 
     860             :     /* DROP hook for the subscription being removed */
     861           4 :     InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
     862             : 
     863             :     /*
     864             :      * Lock the subscription so nobody else can do anything with it (including
     865             :      * the replication workers).
     866             :      */
     867           4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
     868             : 
     869             :     /* Get subname */
     870           4 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
     871             :                             Anum_pg_subscription_subname, &isnull);
     872           4 :     Assert(!isnull);
     873           4 :     subname = pstrdup(NameStr(*DatumGetName(datum)));
     874             : 
     875             :     /* Get conninfo */
     876           4 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
     877             :                             Anum_pg_subscription_subconninfo, &isnull);
     878           4 :     Assert(!isnull);
     879           4 :     conninfo = TextDatumGetCString(datum);
     880             : 
     881             :     /* Get slotname */
     882           4 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
     883             :                             Anum_pg_subscription_subslotname, &isnull);
     884           4 :     if (!isnull)
     885           1 :         slotname = pstrdup(NameStr(*DatumGetName(datum)));
     886             :     else
     887           3 :         slotname = NULL;
     888             : 
     889             :     /*
     890             :      * Since dropping a replication slot is not transactional, the replication
     891             :      * slot stays dropped even if the transaction rolls back.  So we cannot
     892             :      * run DROP SUBSCRIPTION inside a transaction block if dropping the
     893             :      * replication slot.
     894             :      *
     895             :      * XXX The command name should really be something like "DROP SUBSCRIPTION
     896             :      * of a subscription that is associated with a replication slot", but we
     897             :      * don't have the proper facilities for that.
     898             :      */
     899           4 :     if (slotname)
     900           1 :         PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
     901             : 
     902             : 
     903           3 :     ObjectAddressSet(myself, SubscriptionRelationId, subid);
     904           3 :     EventTriggerSQLDropAddObject(&myself, true, true);
     905             : 
     906             :     /* Remove the tuple from catalog. */
     907           3 :     CatalogTupleDelete(rel, &tup->t_self);
     908             : 
     909           3 :     ReleaseSysCache(tup);
     910             : 
     911             :     /*
     912             :      * If we are dropping the replication slot, stop all the subscription
     913             :      * workers immediately, so that the slot becomes accessible.  Otherwise
     914             :      * just schedule the stopping for the end of the transaction.
     915             :      *
     916             :      * New workers won't be started because we hold an exclusive lock on the
     917             :      * subscription till the end of the transaction.
     918             :      */
     919           3 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     920           3 :     subworkers = logicalrep_workers_find(subid, false);
     921           3 :     LWLockRelease(LogicalRepWorkerLock);
     922           3 :     foreach(lc, subworkers)
     923             :     {
     924           0 :         LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     925             : 
     926           0 :         if (slotname)
     927           0 :             logicalrep_worker_stop(w->subid, w->relid);
     928             :         else
     929           0 :             logicalrep_worker_stop_at_commit(w->subid, w->relid);
     930             :     }
     931           3 :     list_free(subworkers);
     932             : 
     933             :     /* Clean up dependencies */
     934           3 :     deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
     935             : 
     936             :     /* Remove any associated relation synchronization states. */
     937           3 :     RemoveSubscriptionRel(subid, InvalidOid);
     938             : 
     939             :     /* Remove the origin tracking if exists. */
     940           3 :     snprintf(originname, sizeof(originname), "pg_%u", subid);
     941           3 :     originid = replorigin_by_name(originname, true);
     942           3 :     if (originid != InvalidRepOriginId)
     943           3 :         replorigin_drop(originid, false);
     944             : 
     945             :     /*
     946             :      * If there is no slot associated with the subscription, we can finish
     947             :      * here.
     948             :      */
     949           3 :     if (!slotname)
     950             :     {
     951           3 :         heap_close(rel, NoLock);
     952           3 :         return;
     953             :     }
     954             : 
     955             :     /*
     956             :      * Otherwise drop the replication slot at the publisher node using the
     957             :      * replication connection.
     958             :      */
     959           0 :     load_file("libpqwalreceiver", false);
     960             : 
     961           0 :     initStringInfo(&cmd);
     962           0 :     appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
     963             : 
     964           0 :     wrconn = walrcv_connect(conninfo, true, subname, &err);
     965           0 :     if (wrconn == NULL)
     966           0 :         ereport(ERROR,
     967             :                 (errmsg("could not connect to publisher when attempting to "
     968             :                         "drop the replication slot \"%s\"", slotname),
     969             :                  errdetail("The error was: %s", err),
     970             :                  errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
     971             :                          "to disassociate the subscription from the slot.")));
     972             : 
     973           0 :     PG_TRY();
     974             :     {
     975             :         WalRcvExecResult *res;
     976             : 
     977           0 :         res = walrcv_exec(wrconn, cmd.data, 0, NULL);
     978             : 
     979           0 :         if (res->status != WALRCV_OK_COMMAND)
     980           0 :             ereport(ERROR,
     981             :                     (errmsg("could not drop the replication slot \"%s\" on publisher",
     982             :                             slotname),
     983             :                      errdetail("The error was: %s", res->err)));
     984             :         else
     985           0 :             ereport(NOTICE,
     986             :                     (errmsg("dropped replication slot \"%s\" on publisher",
     987             :                             slotname)));
     988             : 
     989           0 :         walrcv_clear_result(res);
     990             :     }
     991           0 :     PG_CATCH();
     992             :     {
     993             :         /* Close the connection in case of failure */
     994           0 :         walrcv_disconnect(wrconn);
     995           0 :         PG_RE_THROW();
     996             :     }
     997           0 :     PG_END_TRY();
     998             : 
     999           0 :     walrcv_disconnect(wrconn);
    1000             : 
    1001           0 :     pfree(cmd.data);
    1002             : 
    1003           0 :     heap_close(rel, NoLock);
    1004             : }
    1005             : 
    1006             : /*
    1007             :  * Internal workhorse for changing a subscription owner
    1008             :  */
    1009             : static void
    1010           2 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    1011             : {
    1012             :     Form_pg_subscription form;
    1013             : 
    1014           2 :     form = (Form_pg_subscription) GETSTRUCT(tup);
    1015             : 
    1016           2 :     if (form->subowner == newOwnerId)
    1017           1 :         return;
    1018             : 
    1019           2 :     if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
    1020           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
    1021           0 :                        NameStr(form->subname));
    1022             : 
    1023             :     /* New owner must be a superuser */
    1024           2 :     if (!superuser_arg(newOwnerId))
    1025           1 :         ereport(ERROR,
    1026             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1027             :                  errmsg("permission denied to change owner of subscription \"%s\"",
    1028             :                         NameStr(form->subname)),
    1029             :                  errhint("The owner of a subscription must be a superuser.")));
    1030             : 
    1031           1 :     form->subowner = newOwnerId;
    1032           1 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1033             : 
    1034             :     /* Update owner dependency reference */
    1035           2 :     changeDependencyOnOwner(SubscriptionRelationId,
    1036           2 :                             HeapTupleGetOid(tup),
    1037             :                             newOwnerId);
    1038             : 
    1039           1 :     InvokeObjectPostAlterHook(SubscriptionRelationId,
    1040             :                               HeapTupleGetOid(tup), 0);
    1041             : }
    1042             : 
    1043             : /*
    1044             :  * Change subscription owner -- by name
    1045             :  */
    1046             : ObjectAddress
    1047           2 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
    1048             : {
    1049             :     Oid         subid;
    1050             :     HeapTuple   tup;
    1051             :     Relation    rel;
    1052             :     ObjectAddress address;
    1053             : 
    1054           2 :     rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
    1055             : 
    1056           2 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
    1057             :                               CStringGetDatum(name));
    1058             : 
    1059           2 :     if (!HeapTupleIsValid(tup))
    1060           0 :         ereport(ERROR,
    1061             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1062             :                  errmsg("subscription \"%s\" does not exist", name)));
    1063             : 
    1064           2 :     subid = HeapTupleGetOid(tup);
    1065             : 
    1066           2 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    1067             : 
    1068           1 :     ObjectAddressSet(address, SubscriptionRelationId, subid);
    1069             : 
    1070           1 :     heap_freetuple(tup);
    1071             : 
    1072           1 :     heap_close(rel, RowExclusiveLock);
    1073             : 
    1074           1 :     return address;
    1075             : }
    1076             : 
    1077             : /*
    1078             :  * Change subscription owner -- by OID
    1079             :  */
    1080             : void
    1081           0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    1082             : {
    1083             :     HeapTuple   tup;
    1084             :     Relation    rel;
    1085             : 
    1086           0 :     rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
    1087             : 
    1088           0 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
    1089             : 
    1090           0 :     if (!HeapTupleIsValid(tup))
    1091           0 :         ereport(ERROR,
    1092             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1093             :                  errmsg("subscription with OID %u does not exist", subid)));
    1094             : 
    1095           0 :     AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
    1096             : 
    1097           0 :     heap_freetuple(tup);
    1098             : 
    1099           0 :     heap_close(rel, RowExclusiveLock);
    1100           0 : }
    1101             : 
    1102             : /*
    1103             :  * Get the list of tables which belong to specified publications on the
    1104             :  * publisher connection.
    1105             :  */
    1106             : static List *
    1107           0 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
    1108             : {
    1109             :     WalRcvExecResult *res;
    1110             :     StringInfoData cmd;
    1111             :     TupleTableSlot *slot;
    1112           0 :     Oid         tableRow[2] = {TEXTOID, TEXTOID};
    1113             :     ListCell   *lc;
    1114             :     bool        first;
    1115           0 :     List       *tablelist = NIL;
    1116             : 
    1117           0 :     Assert(list_length(publications) > 0);
    1118             : 
    1119           0 :     initStringInfo(&cmd);
    1120           0 :     appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
    1121             :                            "  FROM pg_catalog.pg_publication_tables t\n"
    1122             :                            " WHERE t.pubname IN (");
    1123           0 :     first = true;
    1124           0 :     foreach(lc, publications)
    1125             :     {
    1126           0 :         char       *pubname = strVal(lfirst(lc));
    1127             : 
    1128           0 :         if (first)
    1129           0 :             first = false;
    1130             :         else
    1131           0 :             appendStringInfoString(&cmd, ", ");
    1132             : 
    1133           0 :         appendStringInfoString(&cmd, quote_literal_cstr(pubname));
    1134             :     }
    1135           0 :     appendStringInfoChar(&cmd, ')');
    1136             : 
    1137           0 :     res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
    1138           0 :     pfree(cmd.data);
    1139             : 
    1140           0 :     if (res->status != WALRCV_OK_TUPLES)
    1141           0 :         ereport(ERROR,
    1142             :                 (errmsg("could not receive list of replicated tables from the publisher: %s",
    1143             :                         res->err)));
    1144             : 
    1145             :     /* Process tables. */
    1146           0 :     slot = MakeSingleTupleTableSlot(res->tupledesc);
    1147           0 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
    1148             :     {
    1149             :         char       *nspname;
    1150             :         char       *relname;
    1151             :         bool        isnull;
    1152             :         RangeVar   *rv;
    1153             : 
    1154           0 :         nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
    1155           0 :         Assert(!isnull);
    1156           0 :         relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
    1157           0 :         Assert(!isnull);
    1158             : 
    1159           0 :         rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
    1160           0 :         tablelist = lappend(tablelist, rv);
    1161             : 
    1162           0 :         ExecClearTuple(slot);
    1163             :     }
    1164           0 :     ExecDropSingleTupleTableSlot(slot);
    1165             : 
    1166           0 :     walrcv_clear_result(res);
    1167             : 
    1168           0 :     return tablelist;
    1169             : }

Generated by: LCOV version 1.11