Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * publicationcmds.c
4 : * publication manipulation
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * publicationcmds.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "funcapi.h"
18 : #include "miscadmin.h"
19 :
20 : #include "access/genam.h"
21 : #include "access/hash.h"
22 : #include "access/heapam.h"
23 : #include "access/htup_details.h"
24 : #include "access/xact.h"
25 :
26 : #include "catalog/catalog.h"
27 : #include "catalog/indexing.h"
28 : #include "catalog/namespace.h"
29 : #include "catalog/objectaccess.h"
30 : #include "catalog/objectaddress.h"
31 : #include "catalog/pg_inherits_fn.h"
32 : #include "catalog/pg_type.h"
33 : #include "catalog/pg_publication.h"
34 : #include "catalog/pg_publication_rel.h"
35 :
36 : #include "commands/dbcommands.h"
37 : #include "commands/defrem.h"
38 : #include "commands/event_trigger.h"
39 : #include "commands/publicationcmds.h"
40 :
41 : #include "utils/array.h"
42 : #include "utils/builtins.h"
43 : #include "utils/catcache.h"
44 : #include "utils/fmgroids.h"
45 : #include "utils/inval.h"
46 : #include "utils/lsyscache.h"
47 : #include "utils/rel.h"
48 : #include "utils/syscache.h"
49 : #include "utils/varlena.h"
50 :
51 : /* Same as MAXNUMMESSAGES in sinvaladt.c */
52 : #define MAX_RELCACHE_INVAL_MSGS 4096
53 :
54 : static List *OpenTableList(List *tables);
55 : static void CloseTableList(List *rels);
56 : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
57 : AlterPublicationStmt *stmt);
58 : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
59 :
60 : static void
61 14 : parse_publication_options(List *options,
62 : bool *publish_given,
63 : bool *publish_insert,
64 : bool *publish_update,
65 : bool *publish_delete)
66 : {
67 : ListCell *lc;
68 :
69 14 : *publish_given = false;
70 :
71 : /* Defaults are true */
72 14 : *publish_insert = true;
73 14 : *publish_update = true;
74 14 : *publish_delete = true;
75 :
76 : /* Parse options */
77 19 : foreach(lc, options)
78 : {
79 7 : DefElem *defel = (DefElem *) lfirst(lc);
80 :
81 7 : if (strcmp(defel->defname, "publish") == 0)
82 : {
83 : char *publish;
84 : List *publish_list;
85 : ListCell *lc;
86 :
87 6 : if (*publish_given)
88 0 : ereport(ERROR,
89 : (errcode(ERRCODE_SYNTAX_ERROR),
90 : errmsg("conflicting or redundant options")));
91 :
92 : /*
93 : * If publish option was given only the explicitly listed actions
94 : * should be published.
95 : */
96 6 : *publish_insert = false;
97 6 : *publish_update = false;
98 6 : *publish_delete = false;
99 :
100 6 : *publish_given = true;
101 6 : publish = defGetString(defel);
102 :
103 6 : if (!SplitIdentifierString(publish, ',', &publish_list))
104 0 : ereport(ERROR,
105 : (errcode(ERRCODE_SYNTAX_ERROR),
106 : errmsg("invalid publish list")));
107 :
108 : /* Process the option list. */
109 14 : foreach(lc, publish_list)
110 : {
111 9 : char *publish_opt = (char *) lfirst(lc);
112 :
113 9 : if (strcmp(publish_opt, "insert") == 0)
114 4 : *publish_insert = true;
115 5 : else if (strcmp(publish_opt, "update") == 0)
116 3 : *publish_update = true;
117 2 : else if (strcmp(publish_opt, "delete") == 0)
118 1 : *publish_delete = true;
119 : else
120 1 : ereport(ERROR,
121 : (errcode(ERRCODE_SYNTAX_ERROR),
122 : errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
123 : }
124 : }
125 : else
126 1 : ereport(ERROR,
127 : (errcode(ERRCODE_SYNTAX_ERROR),
128 : errmsg("unrecognized publication parameter: %s", defel->defname)));
129 : }
130 12 : }
131 :
132 : /*
133 : * Create new publication.
134 : */
135 : ObjectAddress
136 13 : CreatePublication(CreatePublicationStmt *stmt)
137 : {
138 : Relation rel;
139 : ObjectAddress myself;
140 : Oid puboid;
141 : bool nulls[Natts_pg_publication];
142 : Datum values[Natts_pg_publication];
143 : HeapTuple tup;
144 : bool publish_given;
145 : bool publish_insert;
146 : bool publish_update;
147 : bool publish_delete;
148 : AclResult aclresult;
149 :
150 : /* must have CREATE privilege on database */
151 13 : aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
152 13 : if (aclresult != ACLCHECK_OK)
153 2 : aclcheck_error(aclresult, ACL_KIND_DATABASE,
154 1 : get_database_name(MyDatabaseId));
155 :
156 : /* FOR ALL TABLES requires superuser */
157 12 : if (stmt->for_all_tables && !superuser())
158 0 : ereport(ERROR,
159 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
160 : (errmsg("must be superuser to create FOR ALL TABLES publication"))));
161 :
162 12 : rel = heap_open(PublicationRelationId, RowExclusiveLock);
163 :
164 : /* Check if name is used */
165 12 : puboid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(stmt->pubname));
166 12 : if (OidIsValid(puboid))
167 : {
168 1 : ereport(ERROR,
169 : (errcode(ERRCODE_DUPLICATE_OBJECT),
170 : errmsg("publication \"%s\" already exists",
171 : stmt->pubname)));
172 : }
173 :
174 : /* Form a tuple. */
175 11 : memset(values, 0, sizeof(values));
176 11 : memset(nulls, false, sizeof(nulls));
177 :
178 11 : values[Anum_pg_publication_pubname - 1] =
179 11 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
180 11 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
181 :
182 11 : parse_publication_options(stmt->options,
183 : &publish_given, &publish_insert,
184 : &publish_update, &publish_delete);
185 :
186 9 : values[Anum_pg_publication_puballtables - 1] =
187 9 : BoolGetDatum(stmt->for_all_tables);
188 9 : values[Anum_pg_publication_pubinsert - 1] =
189 9 : BoolGetDatum(publish_insert);
190 9 : values[Anum_pg_publication_pubupdate - 1] =
191 9 : BoolGetDatum(publish_update);
192 9 : values[Anum_pg_publication_pubdelete - 1] =
193 9 : BoolGetDatum(publish_delete);
194 :
195 9 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
196 :
197 : /* Insert tuple into catalog. */
198 9 : puboid = CatalogTupleInsert(rel, tup);
199 9 : heap_freetuple(tup);
200 :
201 9 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
202 :
203 9 : ObjectAddressSet(myself, PublicationRelationId, puboid);
204 :
205 : /* Make the changes visible. */
206 9 : CommandCounterIncrement();
207 :
208 9 : if (stmt->tables)
209 : {
210 : List *rels;
211 :
212 5 : Assert(list_length(stmt->tables) > 0);
213 :
214 5 : rels = OpenTableList(stmt->tables);
215 5 : PublicationAddTables(puboid, rels, true, NULL);
216 4 : CloseTableList(rels);
217 : }
218 :
219 8 : heap_close(rel, RowExclusiveLock);
220 :
221 8 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
222 :
223 8 : return myself;
224 : }
225 :
226 : /*
227 : * Change options of a publication.
228 : */
229 : static void
230 3 : AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
231 : HeapTuple tup)
232 : {
233 : bool nulls[Natts_pg_publication];
234 : bool replaces[Natts_pg_publication];
235 : Datum values[Natts_pg_publication];
236 : bool publish_given;
237 : bool publish_insert;
238 : bool publish_update;
239 : bool publish_delete;
240 : ObjectAddress obj;
241 :
242 3 : parse_publication_options(stmt->options,
243 : &publish_given, &publish_insert,
244 : &publish_update, &publish_delete);
245 :
246 : /* Everything ok, form a new tuple. */
247 3 : memset(values, 0, sizeof(values));
248 3 : memset(nulls, false, sizeof(nulls));
249 3 : memset(replaces, false, sizeof(replaces));
250 :
251 3 : if (publish_given)
252 : {
253 3 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
254 3 : replaces[Anum_pg_publication_pubinsert - 1] = true;
255 :
256 3 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
257 3 : replaces[Anum_pg_publication_pubupdate - 1] = true;
258 :
259 3 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
260 3 : replaces[Anum_pg_publication_pubdelete - 1] = true;
261 : }
262 :
263 3 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
264 : replaces);
265 :
266 : /* Update the catalog. */
267 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
268 :
269 3 : CommandCounterIncrement();
270 :
271 : /* Invalidate the relcache. */
272 3 : if (((Form_pg_publication) GETSTRUCT(tup))->puballtables)
273 : {
274 1 : CacheInvalidateRelcacheAll();
275 : }
276 : else
277 : {
278 2 : List *relids = GetPublicationRelations(HeapTupleGetOid(tup));
279 :
280 : /*
281 : * We don't want to send too many individual messages, at some point
282 : * it's cheaper to just reset whole relcache.
283 : */
284 2 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
285 : {
286 : ListCell *lc;
287 :
288 2 : foreach(lc, relids)
289 : {
290 0 : Oid relid = lfirst_oid(lc);
291 :
292 0 : CacheInvalidateRelcacheByRelid(relid);
293 : }
294 : }
295 : else
296 0 : CacheInvalidateRelcacheAll();
297 : }
298 :
299 3 : ObjectAddressSet(obj, PublicationRelationId, HeapTupleGetOid(tup));
300 3 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
301 : (Node *) stmt);
302 :
303 3 : InvokeObjectPostAlterHook(PublicationRelationId, HeapTupleGetOid(tup), 0);
304 3 : }
305 :
306 : /*
307 : * Add or remove table to/from publication.
308 : */
309 : static void
310 14 : AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
311 : HeapTuple tup)
312 : {
313 14 : Oid pubid = HeapTupleGetOid(tup);
314 14 : List *rels = NIL;
315 14 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
316 :
317 : /* Check that user is allowed to manipulate the publication tables. */
318 14 : if (pubform->puballtables)
319 3 : ereport(ERROR,
320 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
321 : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
322 : NameStr(pubform->pubname)),
323 : errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
324 :
325 11 : Assert(list_length(stmt->tables) > 0);
326 :
327 11 : rels = OpenTableList(stmt->tables);
328 :
329 11 : if (stmt->tableAction == DEFELEM_ADD)
330 8 : PublicationAddTables(pubid, rels, false, stmt);
331 3 : else if (stmt->tableAction == DEFELEM_DROP)
332 2 : PublicationDropTables(pubid, rels, false);
333 : else /* DEFELEM_SET */
334 : {
335 1 : List *oldrelids = GetPublicationRelations(pubid);
336 1 : List *delrels = NIL;
337 : ListCell *oldlc;
338 :
339 : /* Calculate which relations to drop. */
340 2 : foreach(oldlc, oldrelids)
341 : {
342 1 : Oid oldrelid = lfirst_oid(oldlc);
343 : ListCell *newlc;
344 1 : bool found = false;
345 :
346 1 : foreach(newlc, rels)
347 : {
348 1 : Relation newrel = (Relation) lfirst(newlc);
349 :
350 1 : if (RelationGetRelid(newrel) == oldrelid)
351 : {
352 1 : found = true;
353 1 : break;
354 : }
355 : }
356 :
357 1 : if (!found)
358 : {
359 0 : Relation oldrel = heap_open(oldrelid,
360 : ShareUpdateExclusiveLock);
361 :
362 0 : delrels = lappend(delrels, oldrel);
363 : }
364 : }
365 :
366 : /* And drop them. */
367 1 : PublicationDropTables(pubid, delrels, true);
368 :
369 : /*
370 : * Don't bother calculating the difference for adding, we'll catch and
371 : * skip existing ones when doing catalog update.
372 : */
373 1 : PublicationAddTables(pubid, rels, true, stmt);
374 :
375 1 : CloseTableList(delrels);
376 : }
377 :
378 6 : CloseTableList(rels);
379 6 : }
380 :
381 : /*
382 : * Alter the existing publication.
383 : *
384 : * This is dispatcher function for AlterPublicationOptions and
385 : * AlterPublicationTables.
386 : */
387 : void
388 17 : AlterPublication(AlterPublicationStmt *stmt)
389 : {
390 : Relation rel;
391 : HeapTuple tup;
392 :
393 17 : rel = heap_open(PublicationRelationId, RowExclusiveLock);
394 :
395 17 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
396 : CStringGetDatum(stmt->pubname));
397 :
398 17 : if (!HeapTupleIsValid(tup))
399 0 : ereport(ERROR,
400 : (errcode(ERRCODE_UNDEFINED_OBJECT),
401 : errmsg("publication \"%s\" does not exist",
402 : stmt->pubname)));
403 :
404 : /* must be owner */
405 17 : if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
406 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION,
407 0 : stmt->pubname);
408 :
409 17 : if (stmt->options)
410 3 : AlterPublicationOptions(stmt, rel, tup);
411 : else
412 14 : AlterPublicationTables(stmt, rel, tup);
413 :
414 : /* Cleanup. */
415 9 : heap_freetuple(tup);
416 9 : heap_close(rel, RowExclusiveLock);
417 9 : }
418 :
419 : /*
420 : * Drop publication by OID
421 : */
422 : void
423 8 : RemovePublicationById(Oid pubid)
424 : {
425 : Relation rel;
426 : HeapTuple tup;
427 :
428 8 : rel = heap_open(PublicationRelationId, RowExclusiveLock);
429 :
430 8 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
431 :
432 8 : if (!HeapTupleIsValid(tup))
433 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
434 :
435 8 : CatalogTupleDelete(rel, &tup->t_self);
436 :
437 8 : ReleaseSysCache(tup);
438 :
439 8 : heap_close(rel, RowExclusiveLock);
440 8 : }
441 :
442 : /*
443 : * Remove relation from publication by mapping OID.
444 : */
445 : void
446 11 : RemovePublicationRelById(Oid proid)
447 : {
448 : Relation rel;
449 : HeapTuple tup;
450 : Form_pg_publication_rel pubrel;
451 :
452 11 : rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
453 :
454 11 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
455 :
456 11 : if (!HeapTupleIsValid(tup))
457 0 : elog(ERROR, "cache lookup failed for publication table %u",
458 : proid);
459 :
460 11 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
461 :
462 : /* Invalidate relcache so that publication info is rebuilt. */
463 11 : CacheInvalidateRelcacheByRelid(pubrel->prrelid);
464 :
465 11 : CatalogTupleDelete(rel, &tup->t_self);
466 :
467 11 : ReleaseSysCache(tup);
468 :
469 11 : heap_close(rel, RowExclusiveLock);
470 11 : }
471 :
472 : /*
473 : * Open relations based on provided by RangeVar list.
474 : * The returned tables are locked in ShareUpdateExclusiveLock mode.
475 : */
476 : static List *
477 16 : OpenTableList(List *tables)
478 : {
479 16 : List *relids = NIL;
480 16 : List *rels = NIL;
481 : ListCell *lc;
482 :
483 : /*
484 : * Open, share-lock, and check all the explicitly-specified relations
485 : */
486 35 : foreach(lc, tables)
487 : {
488 19 : RangeVar *rv = lfirst(lc);
489 : Relation rel;
490 19 : bool recurse = rv->inh;
491 : Oid myrelid;
492 :
493 19 : CHECK_FOR_INTERRUPTS();
494 :
495 19 : rel = heap_openrv(rv, ShareUpdateExclusiveLock);
496 19 : myrelid = RelationGetRelid(rel);
497 :
498 : /*
499 : * Filter out duplicates if user specifies "foo, foo".
500 : *
501 : * Note that this algorithm is known to not be very efficient (O(N^2))
502 : * but given that it only works on list of tables given to us by user
503 : * it's deemed acceptable.
504 : */
505 19 : if (list_member_oid(relids, myrelid))
506 : {
507 0 : heap_close(rel, ShareUpdateExclusiveLock);
508 0 : continue;
509 : }
510 19 : rels = lappend(rels, rel);
511 19 : relids = lappend_oid(relids, myrelid);
512 :
513 19 : if (recurse)
514 : {
515 : ListCell *child;
516 : List *children;
517 :
518 18 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
519 : NULL);
520 :
521 37 : foreach(child, children)
522 : {
523 19 : Oid childrelid = lfirst_oid(child);
524 :
525 19 : if (list_member_oid(relids, childrelid))
526 18 : continue;
527 :
528 : /*
529 : * Skip duplicates if user specified both parent and child
530 : * tables.
531 : */
532 1 : if (list_member_oid(relids, childrelid))
533 : {
534 0 : heap_close(rel, ShareUpdateExclusiveLock);
535 0 : continue;
536 : }
537 :
538 : /* find_all_inheritors already got lock */
539 1 : rel = heap_open(childrelid, NoLock);
540 1 : rels = lappend(rels, rel);
541 1 : relids = lappend_oid(relids, childrelid);
542 : }
543 : }
544 : }
545 :
546 16 : list_free(relids);
547 :
548 16 : return rels;
549 : }
550 :
551 : /*
552 : * Close all relations in the list.
553 : */
554 : static void
555 11 : CloseTableList(List *rels)
556 : {
557 : ListCell *lc;
558 :
559 25 : foreach(lc, rels)
560 : {
561 14 : Relation rel = (Relation) lfirst(lc);
562 :
563 14 : heap_close(rel, NoLock);
564 : }
565 11 : }
566 :
567 : /*
568 : * Add listed tables to the publication.
569 : */
570 : static void
571 14 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
572 : AlterPublicationStmt *stmt)
573 : {
574 : ListCell *lc;
575 :
576 14 : Assert(!stmt || !stmt->for_all_tables);
577 :
578 26 : foreach(lc, rels)
579 : {
580 17 : Relation rel = (Relation) lfirst(lc);
581 : ObjectAddress obj;
582 :
583 : /* Must be owner of the table or superuser. */
584 17 : if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
585 1 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
586 1 : RelationGetRelationName(rel));
587 :
588 16 : obj = publication_add_relation(pubid, rel, if_not_exists);
589 12 : if (stmt)
590 : {
591 6 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
592 : (Node *) stmt);
593 :
594 6 : InvokeObjectPostCreateHook(PublicationRelRelationId,
595 : obj.objectId, 0);
596 : }
597 : }
598 9 : }
599 :
600 : /*
601 : * Remove listed tables from the publication.
602 : */
603 : static void
604 3 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
605 : {
606 : ObjectAddress obj;
607 : ListCell *lc;
608 : Oid prid;
609 :
610 5 : foreach(lc, rels)
611 : {
612 3 : Relation rel = (Relation) lfirst(lc);
613 3 : Oid relid = RelationGetRelid(rel);
614 :
615 3 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
616 : ObjectIdGetDatum(pubid));
617 3 : if (!OidIsValid(prid))
618 : {
619 1 : if (missing_ok)
620 0 : continue;
621 :
622 1 : ereport(ERROR,
623 : (errcode(ERRCODE_UNDEFINED_OBJECT),
624 : errmsg("relation \"%s\" is not part of the publication",
625 : RelationGetRelationName(rel))));
626 : }
627 :
628 2 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
629 2 : performDeletion(&obj, DROP_CASCADE, 0);
630 : }
631 2 : }
632 :
633 : /*
634 : * Internal workhorse for changing a publication owner
635 : */
636 : static void
637 1 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
638 : {
639 : Form_pg_publication form;
640 :
641 1 : form = (Form_pg_publication) GETSTRUCT(tup);
642 :
643 1 : if (form->pubowner == newOwnerId)
644 1 : return;
645 :
646 1 : if (!superuser())
647 : {
648 : AclResult aclresult;
649 :
650 : /* Must be owner */
651 0 : if (!pg_publication_ownercheck(HeapTupleGetOid(tup), GetUserId()))
652 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION,
653 0 : NameStr(form->pubname));
654 :
655 : /* Must be able to become new owner */
656 0 : check_is_member_of_role(GetUserId(), newOwnerId);
657 :
658 : /* New owner must have CREATE privilege on database */
659 0 : aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
660 0 : if (aclresult != ACLCHECK_OK)
661 0 : aclcheck_error(aclresult, ACL_KIND_DATABASE,
662 0 : get_database_name(MyDatabaseId));
663 :
664 0 : if (form->puballtables && !superuser_arg(newOwnerId))
665 0 : ereport(ERROR,
666 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
667 : errmsg("permission denied to change owner of publication \"%s\"",
668 : NameStr(form->pubname)),
669 : errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
670 : }
671 :
672 1 : form->pubowner = newOwnerId;
673 1 : CatalogTupleUpdate(rel, &tup->t_self, tup);
674 :
675 : /* Update owner dependency reference */
676 2 : changeDependencyOnOwner(PublicationRelationId,
677 2 : HeapTupleGetOid(tup),
678 : newOwnerId);
679 :
680 1 : InvokeObjectPostAlterHook(PublicationRelationId,
681 : HeapTupleGetOid(tup), 0);
682 : }
683 :
684 : /*
685 : * Change publication owner -- by name
686 : */
687 : ObjectAddress
688 1 : AlterPublicationOwner(const char *name, Oid newOwnerId)
689 : {
690 : Oid subid;
691 : HeapTuple tup;
692 : Relation rel;
693 : ObjectAddress address;
694 :
695 1 : rel = heap_open(PublicationRelationId, RowExclusiveLock);
696 :
697 1 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
698 :
699 1 : if (!HeapTupleIsValid(tup))
700 0 : ereport(ERROR,
701 : (errcode(ERRCODE_UNDEFINED_OBJECT),
702 : errmsg("publication \"%s\" does not exist", name)));
703 :
704 1 : subid = HeapTupleGetOid(tup);
705 :
706 1 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
707 :
708 1 : ObjectAddressSet(address, PublicationRelationId, subid);
709 :
710 1 : heap_freetuple(tup);
711 :
712 1 : heap_close(rel, RowExclusiveLock);
713 :
714 1 : return address;
715 : }
716 :
717 : /*
718 : * Change publication owner -- by OID
719 : */
720 : void
721 0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
722 : {
723 : HeapTuple tup;
724 : Relation rel;
725 :
726 0 : rel = heap_open(PublicationRelationId, RowExclusiveLock);
727 :
728 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
729 :
730 0 : if (!HeapTupleIsValid(tup))
731 0 : ereport(ERROR,
732 : (errcode(ERRCODE_UNDEFINED_OBJECT),
733 : errmsg("publication with OID %u does not exist", subid)));
734 :
735 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
736 :
737 0 : heap_freetuple(tup);
738 :
739 0 : heap_close(rel, RowExclusiveLock);
740 0 : }
|