Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_publication.c
4 : * publication C API manipulation
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * pg_publication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "funcapi.h"
18 : #include "miscadmin.h"
19 :
20 : #include "access/genam.h"
21 : #include "access/hash.h"
22 : #include "access/heapam.h"
23 : #include "access/htup_details.h"
24 : #include "access/xact.h"
25 :
26 : #include "catalog/catalog.h"
27 : #include "catalog/dependency.h"
28 : #include "catalog/index.h"
29 : #include "catalog/indexing.h"
30 : #include "catalog/namespace.h"
31 : #include "catalog/objectaccess.h"
32 : #include "catalog/objectaddress.h"
33 : #include "catalog/pg_type.h"
34 : #include "catalog/pg_publication.h"
35 : #include "catalog/pg_publication_rel.h"
36 :
37 : #include "utils/array.h"
38 : #include "utils/builtins.h"
39 : #include "utils/catcache.h"
40 : #include "utils/fmgroids.h"
41 : #include "utils/inval.h"
42 : #include "utils/lsyscache.h"
43 : #include "utils/rel.h"
44 : #include "utils/syscache.h"
45 :
46 : /*
47 : * Check if relation can be in given publication and throws appropriate
48 : * error if not.
49 : */
50 : static void
51 14 : check_publication_add_relation(Relation targetrel)
52 : {
53 : /* Give more specific error for partitioned tables */
54 14 : if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
55 1 : ereport(ERROR,
56 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
57 : errmsg("\"%s\" is a partitioned table",
58 : RelationGetRelationName(targetrel)),
59 : errdetail("Adding partitioned tables to publications is not supported."),
60 : errhint("You can add the table partitions individually.")));
61 :
62 : /* Must be table */
63 13 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
64 2 : ereport(ERROR,
65 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 : errmsg("\"%s\" is not a table",
67 : RelationGetRelationName(targetrel)),
68 : errdetail("Only tables can be added to publications.")));
69 :
70 : /* Can't be system table */
71 11 : if (IsCatalogRelation(targetrel))
72 0 : ereport(ERROR,
73 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 : errmsg("\"%s\" is a system table",
75 : RelationGetRelationName(targetrel)),
76 : errdetail("System tables cannot be added to publications.")));
77 :
78 : /* UNLOGGED and TEMP relations cannot be part of publication. */
79 11 : if (!RelationNeedsWAL(targetrel))
80 0 : ereport(ERROR,
81 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82 : errmsg("table \"%s\" cannot be replicated",
83 : RelationGetRelationName(targetrel)),
84 : errdetail("Temporary and unlogged relations cannot be replicated.")));
85 11 : }
86 :
87 : /*
88 : * Returns if relation represented by oid and Form_pg_class entry
89 : * is publishable.
90 : *
91 : * Does same checks as the above, but does not need relation to be opened
92 : * and also does not throw errors.
93 : *
94 : * Note this also excludes all tables with relid < FirstNormalObjectId,
95 : * ie all tables created during initdb. This mainly affects the preinstalled
96 : * information_schema. (IsCatalogClass() only checks for these inside
97 : * pg_catalog and toast schemas.)
98 : */
99 : static bool
100 169 : is_publishable_class(Oid relid, Form_pg_class reltuple)
101 : {
102 460 : return reltuple->relkind == RELKIND_RELATION &&
103 244 : !IsCatalogClass(relid, reltuple) &&
104 413 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
105 : relid >= FirstNormalObjectId;
106 : }
107 :
108 :
109 : /*
110 : * SQL-callable variant of the above
111 : *
112 : * This returns null when the relation does not exist. This is intended to be
113 : * used for example in psql to avoid gratuitous errors when there are
114 : * concurrent catalog changes.
115 : */
116 : Datum
117 169 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
118 : {
119 169 : Oid relid = PG_GETARG_OID(0);
120 : HeapTuple tuple;
121 : bool result;
122 :
123 169 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
124 169 : if (!tuple)
125 0 : PG_RETURN_NULL();
126 169 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
127 169 : ReleaseSysCache(tuple);
128 169 : PG_RETURN_BOOL(result);
129 : }
130 :
131 :
132 : /*
133 : * Insert new publication / relation mapping.
134 : */
135 : ObjectAddress
136 16 : publication_add_relation(Oid pubid, Relation targetrel,
137 : bool if_not_exists)
138 : {
139 : Relation rel;
140 : HeapTuple tup;
141 : Datum values[Natts_pg_publication_rel];
142 : bool nulls[Natts_pg_publication_rel];
143 16 : Oid relid = RelationGetRelid(targetrel);
144 : Oid prrelid;
145 16 : Publication *pub = GetPublication(pubid);
146 : ObjectAddress myself,
147 : referenced;
148 :
149 16 : rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
150 :
151 : /*
152 : * Check for duplicates. Note that this does not really prevent
153 : * duplicates, it's here just to provide nicer error message in common
154 : * case. The real protection is the unique key on the catalog.
155 : */
156 16 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
157 : ObjectIdGetDatum(pubid)))
158 : {
159 2 : heap_close(rel, RowExclusiveLock);
160 :
161 2 : if (if_not_exists)
162 1 : return InvalidObjectAddress;
163 :
164 1 : ereport(ERROR,
165 : (errcode(ERRCODE_DUPLICATE_OBJECT),
166 : errmsg("relation \"%s\" is already member of publication \"%s\"",
167 : RelationGetRelationName(targetrel), pub->name)));
168 : }
169 :
170 14 : check_publication_add_relation(targetrel);
171 :
172 : /* Form a tuple. */
173 11 : memset(values, 0, sizeof(values));
174 11 : memset(nulls, false, sizeof(nulls));
175 :
176 11 : values[Anum_pg_publication_rel_prpubid - 1] =
177 : ObjectIdGetDatum(pubid);
178 11 : values[Anum_pg_publication_rel_prrelid - 1] =
179 : ObjectIdGetDatum(relid);
180 :
181 11 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
182 :
183 : /* Insert tuple into catalog. */
184 11 : prrelid = CatalogTupleInsert(rel, tup);
185 11 : heap_freetuple(tup);
186 :
187 11 : ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
188 :
189 : /* Add dependency on the publication */
190 11 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
191 11 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
192 :
193 : /* Add dependency on the relation */
194 11 : ObjectAddressSet(referenced, RelationRelationId, relid);
195 11 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
196 :
197 : /* Close the table. */
198 11 : heap_close(rel, RowExclusiveLock);
199 :
200 : /* Invalidate relcache so that publication info is rebuilt. */
201 11 : CacheInvalidateRelcache(targetrel);
202 :
203 11 : return myself;
204 : }
205 :
206 :
207 : /*
208 : * Gets list of publication oids for a relation oid.
209 : */
210 : List *
211 262 : GetRelationPublications(Oid relid)
212 : {
213 262 : List *result = NIL;
214 : CatCList *pubrellist;
215 : int i;
216 :
217 : /* Find all publications associated with the relation. */
218 262 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
219 : ObjectIdGetDatum(relid));
220 262 : for (i = 0; i < pubrellist->n_members; i++)
221 : {
222 0 : HeapTuple tup = &pubrellist->members[i]->tuple;
223 0 : Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
224 :
225 0 : result = lappend_oid(result, pubid);
226 : }
227 :
228 262 : ReleaseSysCacheList(pubrellist);
229 :
230 262 : return result;
231 : }
232 :
233 : /*
234 : * Gets list of relation oids for a publication.
235 : *
236 : * This should only be used for normal publications, the FOR ALL TABLES
237 : * should use GetAllTablesPublicationRelations().
238 : */
239 : List *
240 3 : GetPublicationRelations(Oid pubid)
241 : {
242 : List *result;
243 : Relation pubrelsrel;
244 : ScanKeyData scankey;
245 : SysScanDesc scan;
246 : HeapTuple tup;
247 :
248 : /* Find all publications associated with the relation. */
249 3 : pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
250 :
251 3 : ScanKeyInit(&scankey,
252 : Anum_pg_publication_rel_prpubid,
253 : BTEqualStrategyNumber, F_OIDEQ,
254 : ObjectIdGetDatum(pubid));
255 :
256 3 : scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
257 : true, NULL, 1, &scankey);
258 :
259 3 : result = NIL;
260 7 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
261 : {
262 : Form_pg_publication_rel pubrel;
263 :
264 1 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
265 :
266 1 : result = lappend_oid(result, pubrel->prrelid);
267 : }
268 :
269 3 : systable_endscan(scan);
270 3 : heap_close(pubrelsrel, AccessShareLock);
271 :
272 3 : return result;
273 : }
274 :
275 : /*
276 : * Gets list of publication oids for publications marked as FOR ALL TABLES.
277 : */
278 : List *
279 258 : GetAllTablesPublications(void)
280 : {
281 : List *result;
282 : Relation rel;
283 : ScanKeyData scankey;
284 : SysScanDesc scan;
285 : HeapTuple tup;
286 :
287 : /* Find all publications that are marked as for all tables. */
288 258 : rel = heap_open(PublicationRelationId, AccessShareLock);
289 :
290 258 : ScanKeyInit(&scankey,
291 : Anum_pg_publication_puballtables,
292 : BTEqualStrategyNumber, F_BOOLEQ,
293 : BoolGetDatum(true));
294 :
295 258 : scan = systable_beginscan(rel, InvalidOid, false,
296 : NULL, 1, &scankey);
297 :
298 258 : result = NIL;
299 516 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
300 0 : result = lappend_oid(result, HeapTupleGetOid(tup));
301 :
302 258 : systable_endscan(scan);
303 258 : heap_close(rel, AccessShareLock);
304 :
305 258 : return result;
306 : }
307 :
308 : /*
309 : * Gets list of all relation published by FOR ALL TABLES publication(s).
310 : */
311 : List *
312 0 : GetAllTablesPublicationRelations(void)
313 : {
314 : Relation classRel;
315 : ScanKeyData key[1];
316 : HeapScanDesc scan;
317 : HeapTuple tuple;
318 0 : List *result = NIL;
319 :
320 0 : classRel = heap_open(RelationRelationId, AccessShareLock);
321 :
322 0 : ScanKeyInit(&key[0],
323 : Anum_pg_class_relkind,
324 : BTEqualStrategyNumber, F_CHAREQ,
325 : CharGetDatum(RELKIND_RELATION));
326 :
327 0 : scan = heap_beginscan_catalog(classRel, 1, key);
328 :
329 0 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
330 : {
331 0 : Oid relid = HeapTupleGetOid(tuple);
332 0 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
333 :
334 0 : if (is_publishable_class(relid, relForm))
335 0 : result = lappend_oid(result, relid);
336 : }
337 :
338 0 : heap_endscan(scan);
339 0 : heap_close(classRel, AccessShareLock);
340 :
341 0 : return result;
342 : }
343 :
344 : /*
345 : * Get publication using oid
346 : *
347 : * The Publication struct and its data are palloc'ed here.
348 : */
349 : Publication *
350 18 : GetPublication(Oid pubid)
351 : {
352 : HeapTuple tup;
353 : Publication *pub;
354 : Form_pg_publication pubform;
355 :
356 18 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
357 :
358 18 : if (!HeapTupleIsValid(tup))
359 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
360 :
361 18 : pubform = (Form_pg_publication) GETSTRUCT(tup);
362 :
363 18 : pub = (Publication *) palloc(sizeof(Publication));
364 18 : pub->oid = pubid;
365 18 : pub->name = pstrdup(NameStr(pubform->pubname));
366 18 : pub->alltables = pubform->puballtables;
367 18 : pub->pubactions.pubinsert = pubform->pubinsert;
368 18 : pub->pubactions.pubupdate = pubform->pubupdate;
369 18 : pub->pubactions.pubdelete = pubform->pubdelete;
370 :
371 18 : ReleaseSysCache(tup);
372 :
373 18 : return pub;
374 : }
375 :
376 :
377 : /*
378 : * Get Publication using name.
379 : */
380 : Publication *
381 2 : GetPublicationByName(const char *pubname, bool missing_ok)
382 : {
383 : Oid oid;
384 :
385 2 : oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
386 2 : if (!OidIsValid(oid))
387 : {
388 0 : if (missing_ok)
389 0 : return NULL;
390 :
391 0 : ereport(ERROR,
392 : (errcode(ERRCODE_UNDEFINED_OBJECT),
393 : errmsg("publication \"%s\" does not exist", pubname)));
394 : }
395 :
396 2 : return GetPublication(oid);
397 : }
398 :
399 : /*
400 : * get_publication_oid - given a publication name, look up the OID
401 : *
402 : * If missing_ok is false, throw an error if name not found. If true, just
403 : * return InvalidOid.
404 : */
405 : Oid
406 16 : get_publication_oid(const char *pubname, bool missing_ok)
407 : {
408 : Oid oid;
409 :
410 16 : oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
411 16 : if (!OidIsValid(oid) && !missing_ok)
412 1 : ereport(ERROR,
413 : (errcode(ERRCODE_UNDEFINED_OBJECT),
414 : errmsg("publication \"%s\" does not exist", pubname)));
415 15 : return oid;
416 : }
417 :
418 : /*
419 : * get_publication_name - given a publication Oid, look up the name
420 : */
421 : char *
422 23 : get_publication_name(Oid pubid)
423 : {
424 : HeapTuple tup;
425 : char *pubname;
426 : Form_pg_publication pubform;
427 :
428 23 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
429 :
430 23 : if (!HeapTupleIsValid(tup))
431 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
432 :
433 23 : pubform = (Form_pg_publication) GETSTRUCT(tup);
434 23 : pubname = pstrdup(NameStr(pubform->pubname));
435 :
436 23 : ReleaseSysCache(tup);
437 :
438 23 : return pubname;
439 : }
440 :
441 : /*
442 : * Returns Oids of tables in a publication.
443 : */
444 : Datum
445 0 : pg_get_publication_tables(PG_FUNCTION_ARGS)
446 : {
447 : FuncCallContext *funcctx;
448 0 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
449 : Publication *publication;
450 : List *tables;
451 : ListCell **lcp;
452 :
453 : /* stuff done only on the first call of the function */
454 0 : if (SRF_IS_FIRSTCALL())
455 : {
456 : MemoryContext oldcontext;
457 :
458 : /* create a function context for cross-call persistence */
459 0 : funcctx = SRF_FIRSTCALL_INIT();
460 :
461 : /* switch to memory context appropriate for multiple function calls */
462 0 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
463 :
464 0 : publication = GetPublicationByName(pubname, false);
465 0 : if (publication->alltables)
466 0 : tables = GetAllTablesPublicationRelations();
467 : else
468 0 : tables = GetPublicationRelations(publication->oid);
469 0 : lcp = (ListCell **) palloc(sizeof(ListCell *));
470 0 : *lcp = list_head(tables);
471 0 : funcctx->user_fctx = (void *) lcp;
472 :
473 0 : MemoryContextSwitchTo(oldcontext);
474 : }
475 :
476 : /* stuff done on every call of the function */
477 0 : funcctx = SRF_PERCALL_SETUP();
478 0 : lcp = (ListCell **) funcctx->user_fctx;
479 :
480 0 : while (*lcp != NULL)
481 : {
482 0 : Oid relid = lfirst_oid(*lcp);
483 :
484 0 : *lcp = lnext(*lcp);
485 0 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
486 : }
487 :
488 0 : SRF_RETURN_DONE(funcctx);
489 : }
|