Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * subscriptioncmds.c
4 : * subscription catalog manipulation functions
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * subscriptioncmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "miscadmin.h"
18 :
19 : #include "access/heapam.h"
20 : #include "access/htup_details.h"
21 : #include "access/xact.h"
22 :
23 : #include "catalog/dependency.h"
24 : #include "catalog/indexing.h"
25 : #include "catalog/namespace.h"
26 : #include "catalog/objectaccess.h"
27 : #include "catalog/objectaddress.h"
28 : #include "catalog/pg_type.h"
29 : #include "catalog/pg_subscription.h"
30 : #include "catalog/pg_subscription_rel.h"
31 :
32 : #include "commands/defrem.h"
33 : #include "commands/event_trigger.h"
34 : #include "commands/subscriptioncmds.h"
35 :
36 : #include "executor/executor.h"
37 :
38 : #include "nodes/makefuncs.h"
39 :
40 : #include "replication/logicallauncher.h"
41 : #include "replication/origin.h"
42 : #include "replication/walreceiver.h"
43 : #include "replication/walsender.h"
44 : #include "replication/worker_internal.h"
45 :
46 : #include "storage/lmgr.h"
47 :
48 : #include "utils/builtins.h"
49 : #include "utils/guc.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/memutils.h"
52 : #include "utils/syscache.h"
53 :
54 : static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
55 :
56 : /*
57 : * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
58 : *
59 : * Since not all options can be specified in both commands, this function
60 : * will report an error on options if the target output pointer is NULL to
61 : * accommodate that.
62 : */
63 : static void
64 25 : parse_subscription_options(List *options, bool *connect, bool *enabled_given,
65 : bool *enabled, bool *create_slot,
66 : bool *slot_name_given, char **slot_name,
67 : bool *copy_data, char **synchronous_commit,
68 : bool *refresh)
69 : {
70 : ListCell *lc;
71 25 : bool connect_given = false;
72 25 : bool create_slot_given = false;
73 25 : bool copy_data_given = false;
74 25 : bool refresh_given = false;
75 :
76 : /* If connect is specified, the others also need to be. */
77 25 : Assert(!connect || (enabled && create_slot && copy_data));
78 :
79 25 : if (connect)
80 16 : *connect = true;
81 25 : if (enabled)
82 : {
83 19 : *enabled_given = false;
84 19 : *enabled = true;
85 : }
86 25 : if (create_slot)
87 16 : *create_slot = true;
88 25 : if (slot_name)
89 : {
90 21 : *slot_name_given = false;
91 21 : *slot_name = NULL;
92 : }
93 25 : if (copy_data)
94 17 : *copy_data = true;
95 25 : if (synchronous_commit)
96 21 : *synchronous_commit = NULL;
97 25 : if (refresh)
98 1 : *refresh = true;
99 :
100 : /* Parse options */
101 56 : foreach(lc, options)
102 : {
103 33 : DefElem *defel = (DefElem *) lfirst(lc);
104 :
105 33 : if (strcmp(defel->defname, "connect") == 0 && connect)
106 : {
107 9 : if (connect_given)
108 0 : ereport(ERROR,
109 : (errcode(ERRCODE_SYNTAX_ERROR),
110 : errmsg("conflicting or redundant options")));
111 :
112 9 : connect_given = true;
113 9 : *connect = defGetBoolean(defel);
114 : }
115 24 : else if (strcmp(defel->defname, "enabled") == 0 && enabled)
116 : {
117 6 : if (*enabled_given)
118 0 : ereport(ERROR,
119 : (errcode(ERRCODE_SYNTAX_ERROR),
120 : errmsg("conflicting or redundant options")));
121 :
122 6 : *enabled_given = true;
123 6 : *enabled = defGetBoolean(defel);
124 : }
125 18 : else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
126 : {
127 4 : if (create_slot_given)
128 0 : ereport(ERROR,
129 : (errcode(ERRCODE_SYNTAX_ERROR),
130 : errmsg("conflicting or redundant options")));
131 :
132 4 : create_slot_given = true;
133 4 : *create_slot = defGetBoolean(defel);
134 : }
135 14 : else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
136 : {
137 9 : if (*slot_name_given)
138 0 : ereport(ERROR,
139 : (errcode(ERRCODE_SYNTAX_ERROR),
140 : errmsg("conflicting or redundant options")));
141 :
142 9 : *slot_name_given = true;
143 9 : *slot_name = defGetString(defel);
144 :
145 : /* Setting slot_name = NONE is treated as no slot name. */
146 18 : if (strcmp(*slot_name, "none") == 0)
147 8 : *slot_name = NULL;
148 : }
149 5 : else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
150 : {
151 1 : if (copy_data_given)
152 0 : ereport(ERROR,
153 : (errcode(ERRCODE_SYNTAX_ERROR),
154 : errmsg("conflicting or redundant options")));
155 :
156 1 : copy_data_given = true;
157 1 : *copy_data = defGetBoolean(defel);
158 : }
159 4 : else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
160 : synchronous_commit)
161 : {
162 2 : if (*synchronous_commit)
163 0 : ereport(ERROR,
164 : (errcode(ERRCODE_SYNTAX_ERROR),
165 : errmsg("conflicting or redundant options")));
166 :
167 2 : *synchronous_commit = defGetString(defel);
168 :
169 : /* Test if the given value is valid for synchronous_commit GUC. */
170 2 : (void) set_config_option("synchronous_commit", *synchronous_commit,
171 : PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
172 : false, 0, false);
173 : }
174 2 : else if (strcmp(defel->defname, "refresh") == 0 && refresh)
175 : {
176 1 : if (refresh_given)
177 0 : ereport(ERROR,
178 : (errcode(ERRCODE_SYNTAX_ERROR),
179 : errmsg("conflicting or redundant options")));
180 :
181 1 : refresh_given = true;
182 1 : *refresh = defGetBoolean(defel);
183 : }
184 : else
185 1 : ereport(ERROR,
186 : (errcode(ERRCODE_SYNTAX_ERROR),
187 : errmsg("unrecognized subscription parameter: %s", defel->defname)));
188 : }
189 :
190 : /*
191 : * We've been explicitly asked to not connect, that requires some
192 : * additional processing.
193 : */
194 23 : if (connect && !*connect)
195 : {
196 : /* Check for incompatible options from the user. */
197 9 : if (enabled && *enabled_given && *enabled)
198 1 : ereport(ERROR,
199 : (errcode(ERRCODE_SYNTAX_ERROR),
200 : errmsg("connect = false and enabled = true are mutually exclusive options")));
201 :
202 8 : if (create_slot && create_slot_given && *create_slot)
203 1 : ereport(ERROR,
204 : (errcode(ERRCODE_SYNTAX_ERROR),
205 : errmsg("connect = false and create_slot = true are mutually exclusive options")));
206 :
207 7 : if (copy_data && copy_data_given && *copy_data)
208 1 : ereport(ERROR,
209 : (errcode(ERRCODE_SYNTAX_ERROR),
210 : errmsg("connect = false and copy_data = true are mutually exclusive options")));
211 :
212 : /* Change the defaults of other options. */
213 6 : *enabled = false;
214 6 : *create_slot = false;
215 6 : *copy_data = false;
216 : }
217 :
218 : /*
219 : * Do additional checking for disallowed combination when slot_name = NONE
220 : * was used.
221 : */
222 20 : if (slot_name && *slot_name_given && !*slot_name)
223 : {
224 8 : if (enabled && *enabled_given && *enabled)
225 1 : ereport(ERROR,
226 : (errcode(ERRCODE_SYNTAX_ERROR),
227 : errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
228 :
229 7 : if (create_slot && create_slot_given && *create_slot)
230 1 : ereport(ERROR,
231 : (errcode(ERRCODE_SYNTAX_ERROR),
232 : errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
233 :
234 6 : if (enabled && !*enabled_given && *enabled)
235 2 : ereport(ERROR,
236 : (errcode(ERRCODE_SYNTAX_ERROR),
237 : errmsg("subscription with slot_name = NONE must also set enabled = false")));
238 :
239 4 : if (create_slot && !create_slot_given && *create_slot)
240 1 : ereport(ERROR,
241 : (errcode(ERRCODE_SYNTAX_ERROR),
242 : errmsg("subscription with slot_name = NONE must also set create_slot = false")));
243 : }
244 15 : }
245 :
246 : /*
247 : * Auxiliary function to return a text array out of a list of String nodes.
248 : */
249 : static Datum
250 5 : publicationListToArray(List *publist)
251 : {
252 : ArrayType *arr;
253 : Datum *datums;
254 5 : int j = 0;
255 : ListCell *cell;
256 : MemoryContext memcxt;
257 : MemoryContext oldcxt;
258 :
259 : /* Create memory context for temporary allocations. */
260 5 : memcxt = AllocSetContextCreate(CurrentMemoryContext,
261 : "publicationListToArray to array",
262 : ALLOCSET_DEFAULT_MINSIZE,
263 : ALLOCSET_DEFAULT_INITSIZE,
264 : ALLOCSET_DEFAULT_MAXSIZE);
265 5 : oldcxt = MemoryContextSwitchTo(memcxt);
266 :
267 5 : datums = palloc(sizeof(text *) * list_length(publist));
268 12 : foreach(cell, publist)
269 : {
270 8 : char *name = strVal(lfirst(cell));
271 : ListCell *pcell;
272 :
273 : /* Check for duplicates. */
274 10 : foreach(pcell, publist)
275 : {
276 10 : char *pname = strVal(lfirst(pcell));
277 :
278 10 : if (name == pname)
279 7 : break;
280 :
281 3 : if (strcmp(name, pname) == 0)
282 1 : ereport(ERROR,
283 : (errcode(ERRCODE_SYNTAX_ERROR),
284 : errmsg("publication name \"%s\" used more than once",
285 : pname)));
286 : }
287 :
288 7 : datums[j++] = CStringGetTextDatum(name);
289 : }
290 :
291 4 : MemoryContextSwitchTo(oldcxt);
292 :
293 4 : arr = construct_array(datums, list_length(publist),
294 : TEXTOID, -1, false, 'i');
295 4 : MemoryContextDelete(memcxt);
296 :
297 4 : return PointerGetDatum(arr);
298 : }
299 :
300 : /*
301 : * Create new subscription.
302 : */
303 : ObjectAddress
304 16 : CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
305 : {
306 : Relation rel;
307 : ObjectAddress myself;
308 : Oid subid;
309 : bool nulls[Natts_pg_subscription];
310 : Datum values[Natts_pg_subscription];
311 16 : Oid owner = GetUserId();
312 : HeapTuple tup;
313 : bool connect;
314 : bool enabled_given;
315 : bool enabled;
316 : bool copy_data;
317 : char *synchronous_commit;
318 : char *conninfo;
319 : char *slotname;
320 : bool slotname_given;
321 : char originname[NAMEDATALEN];
322 : bool create_slot;
323 : List *publications;
324 :
325 : /*
326 : * Parse and check options.
327 : *
328 : * Connection and publication should not be specified here.
329 : */
330 16 : parse_subscription_options(stmt->options, &connect, &enabled_given,
331 : &enabled, &create_slot, &slotname_given,
332 : &slotname, ©_data, &synchronous_commit,
333 : NULL);
334 :
335 : /*
336 : * Since creating a replication slot is not transactional, rolling back
337 : * the transaction leaves the created replication slot. So we cannot run
338 : * CREATE SUBSCRIPTION inside a transaction block if creating a
339 : * replication slot.
340 : */
341 8 : if (create_slot)
342 2 : PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
343 :
344 7 : if (!superuser())
345 1 : ereport(ERROR,
346 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
347 : (errmsg("must be superuser to create subscriptions"))));
348 :
349 6 : rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
350 :
351 : /* Check if name is used */
352 6 : subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
353 : CStringGetDatum(stmt->subname));
354 6 : if (OidIsValid(subid))
355 : {
356 1 : ereport(ERROR,
357 : (errcode(ERRCODE_DUPLICATE_OBJECT),
358 : errmsg("subscription \"%s\" already exists",
359 : stmt->subname)));
360 : }
361 :
362 5 : if (!slotname_given && slotname == NULL)
363 3 : slotname = stmt->subname;
364 :
365 : /* The default for synchronous_commit of subscriptions is off. */
366 5 : if (synchronous_commit == NULL)
367 5 : synchronous_commit = "off";
368 :
369 5 : conninfo = stmt->conninfo;
370 5 : publications = stmt->publication;
371 :
372 : /* Load the library providing us libpq calls. */
373 5 : load_file("libpqwalreceiver", false);
374 :
375 : /* Check the connection info string. */
376 5 : walrcv_check_conninfo(conninfo);
377 :
378 : /* Everything ok, form a new tuple. */
379 4 : memset(values, 0, sizeof(values));
380 4 : memset(nulls, false, sizeof(nulls));
381 :
382 4 : values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
383 4 : values[Anum_pg_subscription_subname - 1] =
384 4 : DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
385 4 : values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
386 4 : values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
387 4 : values[Anum_pg_subscription_subconninfo - 1] =
388 4 : CStringGetTextDatum(conninfo);
389 4 : if (slotname)
390 2 : values[Anum_pg_subscription_subslotname - 1] =
391 2 : DirectFunctionCall1(namein, CStringGetDatum(slotname));
392 : else
393 2 : nulls[Anum_pg_subscription_subslotname - 1] = true;
394 4 : values[Anum_pg_subscription_subsynccommit - 1] =
395 4 : CStringGetTextDatum(synchronous_commit);
396 3 : values[Anum_pg_subscription_subpublications - 1] =
397 4 : publicationListToArray(publications);
398 :
399 3 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
400 :
401 : /* Insert tuple into catalog. */
402 3 : subid = CatalogTupleInsert(rel, tup);
403 3 : heap_freetuple(tup);
404 :
405 3 : recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
406 :
407 3 : snprintf(originname, sizeof(originname), "pg_%u", subid);
408 3 : replorigin_create(originname);
409 :
410 : /*
411 : * Connect to remote side to execute requested commands and fetch table
412 : * info.
413 : */
414 3 : if (connect)
415 : {
416 : XLogRecPtr lsn;
417 : char *err;
418 : WalReceiverConn *wrconn;
419 : List *tables;
420 : ListCell *lc;
421 : char table_state;
422 :
423 : /* Try to connect to the publisher. */
424 0 : wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
425 0 : if (!wrconn)
426 0 : ereport(ERROR,
427 : (errmsg("could not connect to the publisher: %s", err)));
428 :
429 0 : PG_TRY();
430 : {
431 : /*
432 : * Set sync state based on if we were asked to do data copy or
433 : * not.
434 : */
435 0 : table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
436 :
437 : /*
438 : * Get the table list from publisher and build local table status
439 : * info.
440 : */
441 0 : tables = fetch_table_list(wrconn, publications);
442 0 : foreach(lc, tables)
443 : {
444 0 : RangeVar *rv = (RangeVar *) lfirst(lc);
445 : Oid relid;
446 :
447 0 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
448 :
449 : /* Check for supported relkind. */
450 0 : CheckSubscriptionRelkind(get_rel_relkind(relid),
451 0 : rv->schemaname, rv->relname);
452 :
453 0 : SetSubscriptionRelState(subid, relid, table_state,
454 : InvalidXLogRecPtr, false);
455 : }
456 :
457 : /*
458 : * If requested, create permanent slot for the subscription. We
459 : * won't use the initial snapshot for anything, so no need to
460 : * export it.
461 : */
462 0 : if (create_slot)
463 : {
464 0 : Assert(slotname);
465 :
466 0 : walrcv_create_slot(wrconn, slotname, false,
467 : CRS_NOEXPORT_SNAPSHOT, &lsn);
468 0 : ereport(NOTICE,
469 : (errmsg("created replication slot \"%s\" on publisher",
470 : slotname)));
471 : }
472 : }
473 0 : PG_CATCH();
474 : {
475 : /* Close the connection in case of failure. */
476 0 : walrcv_disconnect(wrconn);
477 0 : PG_RE_THROW();
478 : }
479 0 : PG_END_TRY();
480 :
481 : /* And we are done with the remote side. */
482 0 : walrcv_disconnect(wrconn);
483 : }
484 : else
485 3 : ereport(WARNING,
486 : (errmsg("tables were not subscribed, you will have to run "
487 : "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
488 : "subscribe the tables")));
489 :
490 3 : heap_close(rel, RowExclusiveLock);
491 :
492 3 : if (enabled)
493 0 : ApplyLauncherWakeupAtCommit();
494 :
495 3 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
496 :
497 3 : InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
498 :
499 3 : return myself;
500 : }
501 :
502 : static void
503 0 : AlterSubscription_refresh(Subscription *sub, bool copy_data)
504 : {
505 : char *err;
506 : List *pubrel_names;
507 : List *subrel_states;
508 : Oid *subrel_local_oids;
509 : Oid *pubrel_local_oids;
510 : ListCell *lc;
511 : int off;
512 :
513 : /* Load the library providing us libpq calls. */
514 0 : load_file("libpqwalreceiver", false);
515 :
516 : /* Try to connect to the publisher. */
517 0 : wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
518 0 : if (!wrconn)
519 0 : ereport(ERROR,
520 : (errmsg("could not connect to the publisher: %s", err)));
521 :
522 : /* Get the table list from publisher. */
523 0 : pubrel_names = fetch_table_list(wrconn, sub->publications);
524 :
525 : /* We are done with the remote side, close connection. */
526 0 : walrcv_disconnect(wrconn);
527 :
528 : /* Get local table list. */
529 0 : subrel_states = GetSubscriptionRelations(sub->oid);
530 :
531 : /*
532 : * Build qsorted array of local table oids for faster lookup. This can
533 : * potentially contain all tables in the database so speed of lookup is
534 : * important.
535 : */
536 0 : subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
537 0 : off = 0;
538 0 : foreach(lc, subrel_states)
539 : {
540 0 : SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
541 :
542 0 : subrel_local_oids[off++] = relstate->relid;
543 : }
544 0 : qsort(subrel_local_oids, list_length(subrel_states),
545 : sizeof(Oid), oid_cmp);
546 :
547 : /*
548 : * Walk over the remote tables and try to match them to locally known
549 : * tables. If the table is not known locally create a new state for it.
550 : *
551 : * Also builds array of local oids of remote tables for the next step.
552 : */
553 0 : off = 0;
554 0 : pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
555 :
556 0 : foreach(lc, pubrel_names)
557 : {
558 0 : RangeVar *rv = (RangeVar *) lfirst(lc);
559 : Oid relid;
560 :
561 0 : relid = RangeVarGetRelid(rv, AccessShareLock, false);
562 :
563 : /* Check for supported relkind. */
564 0 : CheckSubscriptionRelkind(get_rel_relkind(relid),
565 0 : rv->schemaname, rv->relname);
566 :
567 0 : pubrel_local_oids[off++] = relid;
568 :
569 0 : if (!bsearch(&relid, subrel_local_oids,
570 0 : list_length(subrel_states), sizeof(Oid), oid_cmp))
571 : {
572 0 : SetSubscriptionRelState(sub->oid, relid,
573 : copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
574 : InvalidXLogRecPtr, false);
575 0 : ereport(DEBUG1,
576 : (errmsg("table \"%s.%s\" added to subscription \"%s\"",
577 : rv->schemaname, rv->relname, sub->name)));
578 : }
579 : }
580 :
581 : /*
582 : * Next remove state for tables we should not care about anymore using the
583 : * data we collected above
584 : */
585 0 : qsort(pubrel_local_oids, list_length(pubrel_names),
586 : sizeof(Oid), oid_cmp);
587 :
588 0 : for (off = 0; off < list_length(subrel_states); off++)
589 : {
590 0 : Oid relid = subrel_local_oids[off];
591 :
592 0 : if (!bsearch(&relid, pubrel_local_oids,
593 0 : list_length(pubrel_names), sizeof(Oid), oid_cmp))
594 : {
595 0 : RemoveSubscriptionRel(sub->oid, relid);
596 :
597 0 : logicalrep_worker_stop_at_commit(sub->oid, relid);
598 :
599 0 : ereport(DEBUG1,
600 : (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
601 : get_namespace_name(get_rel_namespace(relid)),
602 : get_rel_name(relid),
603 : sub->name)));
604 : }
605 : }
606 0 : }
607 :
608 : /*
609 : * Alter the existing subscription.
610 : */
611 : ObjectAddress
612 13 : AlterSubscription(AlterSubscriptionStmt *stmt)
613 : {
614 : Relation rel;
615 : ObjectAddress myself;
616 : bool nulls[Natts_pg_subscription];
617 : bool replaces[Natts_pg_subscription];
618 : Datum values[Natts_pg_subscription];
619 : HeapTuple tup;
620 : Oid subid;
621 13 : bool update_tuple = false;
622 : Subscription *sub;
623 :
624 13 : rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
625 :
626 : /* Fetch the existing tuple. */
627 13 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
628 : CStringGetDatum(stmt->subname));
629 :
630 13 : if (!HeapTupleIsValid(tup))
631 1 : ereport(ERROR,
632 : (errcode(ERRCODE_UNDEFINED_OBJECT),
633 : errmsg("subscription \"%s\" does not exist",
634 : stmt->subname)));
635 :
636 : /* must be owner */
637 12 : if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
638 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
639 0 : stmt->subname);
640 :
641 12 : subid = HeapTupleGetOid(tup);
642 12 : sub = GetSubscription(subid, false);
643 :
644 : /* Lock the subscription so nobody else can do anything with it. */
645 12 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
646 :
647 : /* Form a new tuple. */
648 12 : memset(values, 0, sizeof(values));
649 12 : memset(nulls, false, sizeof(nulls));
650 12 : memset(replaces, false, sizeof(replaces));
651 :
652 12 : switch (stmt->kind)
653 : {
654 : case ALTER_SUBSCRIPTION_OPTIONS:
655 : {
656 : char *slotname;
657 : bool slotname_given;
658 : char *synchronous_commit;
659 :
660 5 : parse_subscription_options(stmt->options, NULL, NULL, NULL,
661 : NULL, &slotname_given, &slotname,
662 : NULL, &synchronous_commit, NULL);
663 :
664 3 : if (slotname_given)
665 : {
666 2 : if (sub->enabled && !slotname)
667 0 : ereport(ERROR,
668 : (errcode(ERRCODE_SYNTAX_ERROR),
669 : errmsg("cannot set slot_name = NONE for enabled subscription")));
670 :
671 2 : if (slotname)
672 1 : values[Anum_pg_subscription_subslotname - 1] =
673 1 : DirectFunctionCall1(namein, CStringGetDatum(slotname));
674 : else
675 1 : nulls[Anum_pg_subscription_subslotname - 1] = true;
676 2 : replaces[Anum_pg_subscription_subslotname - 1] = true;
677 : }
678 :
679 3 : if (synchronous_commit)
680 : {
681 1 : values[Anum_pg_subscription_subsynccommit - 1] =
682 1 : CStringGetTextDatum(synchronous_commit);
683 1 : replaces[Anum_pg_subscription_subsynccommit - 1] = true;
684 : }
685 :
686 3 : update_tuple = true;
687 3 : break;
688 : }
689 :
690 : case ALTER_SUBSCRIPTION_ENABLED:
691 : {
692 : bool enabled,
693 : enabled_given;
694 :
695 3 : parse_subscription_options(stmt->options, NULL,
696 : &enabled_given, &enabled, NULL,
697 : NULL, NULL, NULL, NULL, NULL);
698 3 : Assert(enabled_given);
699 :
700 3 : if (!sub->slotname && enabled)
701 1 : ereport(ERROR,
702 : (errcode(ERRCODE_SYNTAX_ERROR),
703 : errmsg("cannot enable subscription that does not have a slot name")));
704 :
705 2 : values[Anum_pg_subscription_subenabled - 1] =
706 2 : BoolGetDatum(enabled);
707 2 : replaces[Anum_pg_subscription_subenabled - 1] = true;
708 :
709 2 : if (enabled)
710 1 : ApplyLauncherWakeupAtCommit();
711 :
712 2 : update_tuple = true;
713 2 : break;
714 : }
715 :
716 : case ALTER_SUBSCRIPTION_CONNECTION:
717 : /* Load the library providing us libpq calls. */
718 2 : load_file("libpqwalreceiver", false);
719 : /* Check the connection info string. */
720 2 : walrcv_check_conninfo(stmt->conninfo);
721 :
722 1 : values[Anum_pg_subscription_subconninfo - 1] =
723 1 : CStringGetTextDatum(stmt->conninfo);
724 1 : replaces[Anum_pg_subscription_subconninfo - 1] = true;
725 1 : update_tuple = true;
726 1 : break;
727 :
728 : case ALTER_SUBSCRIPTION_PUBLICATION:
729 : {
730 : bool copy_data;
731 : bool refresh;
732 :
733 1 : parse_subscription_options(stmt->options, NULL, NULL, NULL,
734 : NULL, NULL, NULL, ©_data,
735 : NULL, &refresh);
736 :
737 1 : values[Anum_pg_subscription_subpublications - 1] =
738 1 : publicationListToArray(stmt->publication);
739 1 : replaces[Anum_pg_subscription_subpublications - 1] = true;
740 :
741 1 : update_tuple = true;
742 :
743 : /* Refresh if user asked us to. */
744 1 : if (refresh)
745 : {
746 0 : if (!sub->enabled)
747 0 : ereport(ERROR,
748 : (errcode(ERRCODE_SYNTAX_ERROR),
749 : errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
750 : errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
751 :
752 : /* Make sure refresh sees the new list of publications. */
753 0 : sub->publications = stmt->publication;
754 :
755 0 : AlterSubscription_refresh(sub, copy_data);
756 : }
757 :
758 1 : break;
759 : }
760 :
761 : case ALTER_SUBSCRIPTION_REFRESH:
762 : {
763 : bool copy_data;
764 :
765 1 : if (!sub->enabled)
766 1 : ereport(ERROR,
767 : (errcode(ERRCODE_SYNTAX_ERROR),
768 : errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
769 :
770 0 : parse_subscription_options(stmt->options, NULL, NULL, NULL,
771 : NULL, NULL, NULL, ©_data,
772 : NULL, NULL);
773 :
774 0 : AlterSubscription_refresh(sub, copy_data);
775 :
776 0 : break;
777 : }
778 :
779 : default:
780 0 : elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
781 : stmt->kind);
782 : }
783 :
784 : /* Update the catalog if needed. */
785 7 : if (update_tuple)
786 : {
787 7 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
788 : replaces);
789 :
790 7 : CatalogTupleUpdate(rel, &tup->t_self, tup);
791 :
792 7 : heap_freetuple(tup);
793 : }
794 :
795 7 : heap_close(rel, RowExclusiveLock);
796 :
797 7 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
798 :
799 7 : InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
800 :
801 7 : return myself;
802 : }
803 :
804 : /*
805 : * Drop a subscription
806 : */
807 : void
808 6 : DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
809 : {
810 : Relation rel;
811 : ObjectAddress myself;
812 : HeapTuple tup;
813 : Oid subid;
814 : Datum datum;
815 : bool isnull;
816 : char *subname;
817 : char *conninfo;
818 : char *slotname;
819 : List *subworkers;
820 : ListCell *lc;
821 : char originname[NAMEDATALEN];
822 6 : char *err = NULL;
823 : RepOriginId originid;
824 6 : WalReceiverConn *wrconn = NULL;
825 : StringInfoData cmd;
826 :
827 : /*
828 : * Lock pg_subscription with AccessExclusiveLock to ensure that the
829 : * launcher doesn't restart new worker during dropping the subscription
830 : */
831 6 : rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
832 :
833 6 : tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
834 : CStringGetDatum(stmt->subname));
835 :
836 6 : if (!HeapTupleIsValid(tup))
837 : {
838 2 : heap_close(rel, NoLock);
839 :
840 2 : if (!stmt->missing_ok)
841 1 : ereport(ERROR,
842 : (errcode(ERRCODE_UNDEFINED_OBJECT),
843 : errmsg("subscription \"%s\" does not exist",
844 : stmt->subname)));
845 : else
846 1 : ereport(NOTICE,
847 : (errmsg("subscription \"%s\" does not exist, skipping",
848 : stmt->subname)));
849 :
850 5 : return;
851 : }
852 :
853 4 : subid = HeapTupleGetOid(tup);
854 :
855 : /* must be owner */
856 4 : if (!pg_subscription_ownercheck(subid, GetUserId()))
857 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
858 0 : stmt->subname);
859 :
860 : /* DROP hook for the subscription being removed */
861 4 : InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
862 :
863 : /*
864 : * Lock the subscription so nobody else can do anything with it (including
865 : * the replication workers).
866 : */
867 4 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
868 :
869 : /* Get subname */
870 4 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
871 : Anum_pg_subscription_subname, &isnull);
872 4 : Assert(!isnull);
873 4 : subname = pstrdup(NameStr(*DatumGetName(datum)));
874 :
875 : /* Get conninfo */
876 4 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
877 : Anum_pg_subscription_subconninfo, &isnull);
878 4 : Assert(!isnull);
879 4 : conninfo = TextDatumGetCString(datum);
880 :
881 : /* Get slotname */
882 4 : datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
883 : Anum_pg_subscription_subslotname, &isnull);
884 4 : if (!isnull)
885 1 : slotname = pstrdup(NameStr(*DatumGetName(datum)));
886 : else
887 3 : slotname = NULL;
888 :
889 : /*
890 : * Since dropping a replication slot is not transactional, the replication
891 : * slot stays dropped even if the transaction rolls back. So we cannot
892 : * run DROP SUBSCRIPTION inside a transaction block if dropping the
893 : * replication slot.
894 : *
895 : * XXX The command name should really be something like "DROP SUBSCRIPTION
896 : * of a subscription that is associated with a replication slot", but we
897 : * don't have the proper facilities for that.
898 : */
899 4 : if (slotname)
900 1 : PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
901 :
902 :
903 3 : ObjectAddressSet(myself, SubscriptionRelationId, subid);
904 3 : EventTriggerSQLDropAddObject(&myself, true, true);
905 :
906 : /* Remove the tuple from catalog. */
907 3 : CatalogTupleDelete(rel, &tup->t_self);
908 :
909 3 : ReleaseSysCache(tup);
910 :
911 : /*
912 : * If we are dropping the replication slot, stop all the subscription
913 : * workers immediately, so that the slot becomes accessible. Otherwise
914 : * just schedule the stopping for the end of the transaction.
915 : *
916 : * New workers won't be started because we hold an exclusive lock on the
917 : * subscription till the end of the transaction.
918 : */
919 3 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
920 3 : subworkers = logicalrep_workers_find(subid, false);
921 3 : LWLockRelease(LogicalRepWorkerLock);
922 3 : foreach(lc, subworkers)
923 : {
924 0 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
925 :
926 0 : if (slotname)
927 0 : logicalrep_worker_stop(w->subid, w->relid);
928 : else
929 0 : logicalrep_worker_stop_at_commit(w->subid, w->relid);
930 : }
931 3 : list_free(subworkers);
932 :
933 : /* Clean up dependencies */
934 3 : deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
935 :
936 : /* Remove any associated relation synchronization states. */
937 3 : RemoveSubscriptionRel(subid, InvalidOid);
938 :
939 : /* Remove the origin tracking if exists. */
940 3 : snprintf(originname, sizeof(originname), "pg_%u", subid);
941 3 : originid = replorigin_by_name(originname, true);
942 3 : if (originid != InvalidRepOriginId)
943 3 : replorigin_drop(originid, false);
944 :
945 : /*
946 : * If there is no slot associated with the subscription, we can finish
947 : * here.
948 : */
949 3 : if (!slotname)
950 : {
951 3 : heap_close(rel, NoLock);
952 3 : return;
953 : }
954 :
955 : /*
956 : * Otherwise drop the replication slot at the publisher node using the
957 : * replication connection.
958 : */
959 0 : load_file("libpqwalreceiver", false);
960 :
961 0 : initStringInfo(&cmd);
962 0 : appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
963 :
964 0 : wrconn = walrcv_connect(conninfo, true, subname, &err);
965 0 : if (wrconn == NULL)
966 0 : ereport(ERROR,
967 : (errmsg("could not connect to publisher when attempting to "
968 : "drop the replication slot \"%s\"", slotname),
969 : errdetail("The error was: %s", err),
970 : errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
971 : "to disassociate the subscription from the slot.")));
972 :
973 0 : PG_TRY();
974 : {
975 : WalRcvExecResult *res;
976 :
977 0 : res = walrcv_exec(wrconn, cmd.data, 0, NULL);
978 :
979 0 : if (res->status != WALRCV_OK_COMMAND)
980 0 : ereport(ERROR,
981 : (errmsg("could not drop the replication slot \"%s\" on publisher",
982 : slotname),
983 : errdetail("The error was: %s", res->err)));
984 : else
985 0 : ereport(NOTICE,
986 : (errmsg("dropped replication slot \"%s\" on publisher",
987 : slotname)));
988 :
989 0 : walrcv_clear_result(res);
990 : }
991 0 : PG_CATCH();
992 : {
993 : /* Close the connection in case of failure */
994 0 : walrcv_disconnect(wrconn);
995 0 : PG_RE_THROW();
996 : }
997 0 : PG_END_TRY();
998 :
999 0 : walrcv_disconnect(wrconn);
1000 :
1001 0 : pfree(cmd.data);
1002 :
1003 0 : heap_close(rel, NoLock);
1004 : }
1005 :
1006 : /*
1007 : * Internal workhorse for changing a subscription owner
1008 : */
1009 : static void
1010 2 : AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1011 : {
1012 : Form_pg_subscription form;
1013 :
1014 2 : form = (Form_pg_subscription) GETSTRUCT(tup);
1015 :
1016 2 : if (form->subowner == newOwnerId)
1017 1 : return;
1018 :
1019 2 : if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
1020 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
1021 0 : NameStr(form->subname));
1022 :
1023 : /* New owner must be a superuser */
1024 2 : if (!superuser_arg(newOwnerId))
1025 1 : ereport(ERROR,
1026 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1027 : errmsg("permission denied to change owner of subscription \"%s\"",
1028 : NameStr(form->subname)),
1029 : errhint("The owner of a subscription must be a superuser.")));
1030 :
1031 1 : form->subowner = newOwnerId;
1032 1 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1033 :
1034 : /* Update owner dependency reference */
1035 2 : changeDependencyOnOwner(SubscriptionRelationId,
1036 2 : HeapTupleGetOid(tup),
1037 : newOwnerId);
1038 :
1039 1 : InvokeObjectPostAlterHook(SubscriptionRelationId,
1040 : HeapTupleGetOid(tup), 0);
1041 : }
1042 :
1043 : /*
1044 : * Change subscription owner -- by name
1045 : */
1046 : ObjectAddress
1047 2 : AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1048 : {
1049 : Oid subid;
1050 : HeapTuple tup;
1051 : Relation rel;
1052 : ObjectAddress address;
1053 :
1054 2 : rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1055 :
1056 2 : tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
1057 : CStringGetDatum(name));
1058 :
1059 2 : if (!HeapTupleIsValid(tup))
1060 0 : ereport(ERROR,
1061 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1062 : errmsg("subscription \"%s\" does not exist", name)));
1063 :
1064 2 : subid = HeapTupleGetOid(tup);
1065 :
1066 2 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1067 :
1068 1 : ObjectAddressSet(address, SubscriptionRelationId, subid);
1069 :
1070 1 : heap_freetuple(tup);
1071 :
1072 1 : heap_close(rel, RowExclusiveLock);
1073 :
1074 1 : return address;
1075 : }
1076 :
1077 : /*
1078 : * Change subscription owner -- by OID
1079 : */
1080 : void
1081 0 : AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
1082 : {
1083 : HeapTuple tup;
1084 : Relation rel;
1085 :
1086 0 : rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
1087 :
1088 0 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
1089 :
1090 0 : if (!HeapTupleIsValid(tup))
1091 0 : ereport(ERROR,
1092 : (errcode(ERRCODE_UNDEFINED_OBJECT),
1093 : errmsg("subscription with OID %u does not exist", subid)));
1094 :
1095 0 : AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1096 :
1097 0 : heap_freetuple(tup);
1098 :
1099 0 : heap_close(rel, RowExclusiveLock);
1100 0 : }
1101 :
1102 : /*
1103 : * Get the list of tables which belong to specified publications on the
1104 : * publisher connection.
1105 : */
1106 : static List *
1107 0 : fetch_table_list(WalReceiverConn *wrconn, List *publications)
1108 : {
1109 : WalRcvExecResult *res;
1110 : StringInfoData cmd;
1111 : TupleTableSlot *slot;
1112 0 : Oid tableRow[2] = {TEXTOID, TEXTOID};
1113 : ListCell *lc;
1114 : bool first;
1115 0 : List *tablelist = NIL;
1116 :
1117 0 : Assert(list_length(publications) > 0);
1118 :
1119 0 : initStringInfo(&cmd);
1120 0 : appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1121 : " FROM pg_catalog.pg_publication_tables t\n"
1122 : " WHERE t.pubname IN (");
1123 0 : first = true;
1124 0 : foreach(lc, publications)
1125 : {
1126 0 : char *pubname = strVal(lfirst(lc));
1127 :
1128 0 : if (first)
1129 0 : first = false;
1130 : else
1131 0 : appendStringInfoString(&cmd, ", ");
1132 :
1133 0 : appendStringInfoString(&cmd, quote_literal_cstr(pubname));
1134 : }
1135 0 : appendStringInfoChar(&cmd, ')');
1136 :
1137 0 : res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1138 0 : pfree(cmd.data);
1139 :
1140 0 : if (res->status != WALRCV_OK_TUPLES)
1141 0 : ereport(ERROR,
1142 : (errmsg("could not receive list of replicated tables from the publisher: %s",
1143 : res->err)));
1144 :
1145 : /* Process tables. */
1146 0 : slot = MakeSingleTupleTableSlot(res->tupledesc);
1147 0 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1148 : {
1149 : char *nspname;
1150 : char *relname;
1151 : bool isnull;
1152 : RangeVar *rv;
1153 :
1154 0 : nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1155 0 : Assert(!isnull);
1156 0 : relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1157 0 : Assert(!isnull);
1158 :
1159 0 : rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1160 0 : tablelist = lappend(tablelist, rv);
1161 :
1162 0 : ExecClearTuple(slot);
1163 : }
1164 0 : ExecDropSingleTupleTableSlot(slot);
1165 :
1166 0 : walrcv_clear_result(res);
1167 :
1168 0 : return tablelist;
1169 : }
|