Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_subscription.c
4 : * replication subscriptions
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/catalog/pg_subscription.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "miscadmin.h"
18 :
19 : #include "access/genam.h"
20 : #include "access/heapam.h"
21 : #include "access/htup_details.h"
22 : #include "access/xact.h"
23 :
24 : #include "catalog/indexing.h"
25 : #include "catalog/pg_type.h"
26 : #include "catalog/pg_subscription.h"
27 : #include "catalog/pg_subscription_rel.h"
28 :
29 : #include "nodes/makefuncs.h"
30 :
31 : #include "storage/lmgr.h"
32 :
33 : #include "utils/array.h"
34 : #include "utils/builtins.h"
35 : #include "utils/fmgroids.h"
36 : #include "utils/pg_lsn.h"
37 : #include "utils/rel.h"
38 : #include "utils/syscache.h"
39 :
40 :
41 : static List *textarray_to_stringlist(ArrayType *textarray);
42 :
43 : /*
44 : * Fetch the subscription from the syscache.
45 : */
46 : Subscription *
47 12 : GetSubscription(Oid subid, bool missing_ok)
48 : {
49 : HeapTuple tup;
50 : Subscription *sub;
51 : Form_pg_subscription subform;
52 : Datum datum;
53 : bool isnull;
54 :
55 12 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
56 :
57 12 : if (!HeapTupleIsValid(tup))
58 : {
59 0 : if (missing_ok)
60 0 : return NULL;
61 :
62 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
63 : }
64 :
65 12 : subform = (Form_pg_subscription) GETSTRUCT(tup);
66 :
67 12 : sub = (Subscription *) palloc(sizeof(Subscription));
68 12 : sub->oid = subid;
69 12 : sub->dbid = subform->subdbid;
70 12 : sub->name = pstrdup(NameStr(subform->subname));
71 12 : sub->owner = subform->subowner;
72 12 : sub->enabled = subform->subenabled;
73 :
74 : /* Get conninfo */
75 12 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
76 : tup,
77 : Anum_pg_subscription_subconninfo,
78 : &isnull);
79 12 : Assert(!isnull);
80 12 : sub->conninfo = TextDatumGetCString(datum);
81 :
82 : /* Get slotname */
83 12 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
84 : tup,
85 : Anum_pg_subscription_subslotname,
86 : &isnull);
87 12 : if (!isnull)
88 10 : sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
89 : else
90 2 : sub->slotname = NULL;
91 :
92 : /* Get synccommit */
93 12 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
94 : tup,
95 : Anum_pg_subscription_subsynccommit,
96 : &isnull);
97 12 : Assert(!isnull);
98 12 : sub->synccommit = TextDatumGetCString(datum);
99 :
100 : /* Get publications */
101 12 : datum = SysCacheGetAttr(SUBSCRIPTIONOID,
102 : tup,
103 : Anum_pg_subscription_subpublications,
104 : &isnull);
105 12 : Assert(!isnull);
106 12 : sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
107 :
108 12 : ReleaseSysCache(tup);
109 :
110 12 : return sub;
111 : }
112 :
113 : /*
114 : * Return number of subscriptions defined in given database.
115 : * Used by dropdb() to check if database can indeed be dropped.
116 : */
117 : int
118 0 : CountDBSubscriptions(Oid dbid)
119 : {
120 0 : int nsubs = 0;
121 : Relation rel;
122 : ScanKeyData scankey;
123 : SysScanDesc scan;
124 : HeapTuple tup;
125 :
126 0 : rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
127 :
128 0 : ScanKeyInit(&scankey,
129 : Anum_pg_subscription_subdbid,
130 : BTEqualStrategyNumber, F_OIDEQ,
131 : ObjectIdGetDatum(dbid));
132 :
133 0 : scan = systable_beginscan(rel, InvalidOid, false,
134 : NULL, 1, &scankey);
135 :
136 0 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
137 0 : nsubs++;
138 :
139 0 : systable_endscan(scan);
140 :
141 0 : heap_close(rel, NoLock);
142 :
143 0 : return nsubs;
144 : }
145 :
146 : /*
147 : * Free memory allocated by subscription struct.
148 : */
149 : void
150 0 : FreeSubscription(Subscription *sub)
151 : {
152 0 : pfree(sub->name);
153 0 : pfree(sub->conninfo);
154 0 : if (sub->slotname)
155 0 : pfree(sub->slotname);
156 0 : list_free_deep(sub->publications);
157 0 : pfree(sub);
158 0 : }
159 :
160 : /*
161 : * get_subscription_oid - given a subscription name, look up the OID
162 : *
163 : * If missing_ok is false, throw an error if name not found. If true, just
164 : * return InvalidOid.
165 : */
166 : Oid
167 7 : get_subscription_oid(const char *subname, bool missing_ok)
168 : {
169 : Oid oid;
170 :
171 7 : oid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
172 : CStringGetDatum(subname));
173 7 : if (!OidIsValid(oid) && !missing_ok)
174 1 : ereport(ERROR,
175 : (errcode(ERRCODE_UNDEFINED_OBJECT),
176 : errmsg("subscription \"%s\" does not exist", subname)));
177 6 : return oid;
178 : }
179 :
180 : /*
181 : * get_subscription_name - given a subscription OID, look up the name
182 : */
183 : char *
184 7 : get_subscription_name(Oid subid)
185 : {
186 : HeapTuple tup;
187 : char *subname;
188 : Form_pg_subscription subform;
189 :
190 7 : tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
191 :
192 7 : if (!HeapTupleIsValid(tup))
193 0 : elog(ERROR, "cache lookup failed for subscription %u", subid);
194 :
195 7 : subform = (Form_pg_subscription) GETSTRUCT(tup);
196 7 : subname = pstrdup(NameStr(subform->subname));
197 :
198 7 : ReleaseSysCache(tup);
199 :
200 7 : return subname;
201 : }
202 :
203 : /*
204 : * Convert text array to list of strings.
205 : *
206 : * Note: the resulting list of strings is pallocated here.
207 : */
208 : static List *
209 12 : textarray_to_stringlist(ArrayType *textarray)
210 : {
211 : Datum *elems;
212 : int nelems,
213 : i;
214 12 : List *res = NIL;
215 :
216 12 : deconstruct_array(textarray,
217 : TEXTOID, -1, false, 'i',
218 : &elems, NULL, &nelems);
219 :
220 12 : if (nelems == 0)
221 0 : return NIL;
222 :
223 32 : for (i = 0; i < nelems; i++)
224 20 : res = lappend(res, makeString(TextDatumGetCString(elems[i])));
225 :
226 12 : return res;
227 : }
228 :
229 : /*
230 : * Set the state of a subscription table.
231 : *
232 : * If update_only is true and the record for given table doesn't exist, do
233 : * nothing. This can be used to avoid inserting a new record that was deleted
234 : * by someone else. Generally, subscription DDL commands should use false,
235 : * workers should use true.
236 : *
237 : * The insert-or-update logic in this function is not concurrency safe so it
238 : * might raise an error in rare circumstances. But if we took a stronger lock
239 : * such as ShareRowExclusiveLock, we would risk more deadlocks.
240 : */
241 : Oid
242 0 : SetSubscriptionRelState(Oid subid, Oid relid, char state,
243 : XLogRecPtr sublsn, bool update_only)
244 : {
245 : Relation rel;
246 : HeapTuple tup;
247 0 : Oid subrelid = InvalidOid;
248 : bool nulls[Natts_pg_subscription_rel];
249 : Datum values[Natts_pg_subscription_rel];
250 :
251 0 : LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
252 :
253 0 : rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
254 :
255 : /* Try finding existing mapping. */
256 0 : tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
257 : ObjectIdGetDatum(relid),
258 : ObjectIdGetDatum(subid));
259 :
260 : /*
261 : * If the record for given table does not exist yet create new record,
262 : * otherwise update the existing one.
263 : */
264 0 : if (!HeapTupleIsValid(tup) && !update_only)
265 : {
266 : /* Form the tuple. */
267 0 : memset(values, 0, sizeof(values));
268 0 : memset(nulls, false, sizeof(nulls));
269 0 : values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
270 0 : values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
271 0 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
272 0 : if (sublsn != InvalidXLogRecPtr)
273 0 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
274 : else
275 0 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
276 :
277 0 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
278 :
279 : /* Insert tuple into catalog. */
280 0 : subrelid = CatalogTupleInsert(rel, tup);
281 :
282 0 : heap_freetuple(tup);
283 : }
284 0 : else if (HeapTupleIsValid(tup))
285 : {
286 : bool replaces[Natts_pg_subscription_rel];
287 :
288 : /* Update the tuple. */
289 0 : memset(values, 0, sizeof(values));
290 0 : memset(nulls, false, sizeof(nulls));
291 0 : memset(replaces, false, sizeof(replaces));
292 :
293 0 : replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
294 0 : values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
295 :
296 0 : replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
297 0 : if (sublsn != InvalidXLogRecPtr)
298 0 : values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
299 : else
300 0 : nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
301 :
302 0 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
303 : replaces);
304 :
305 : /* Update the catalog. */
306 0 : CatalogTupleUpdate(rel, &tup->t_self, tup);
307 :
308 0 : subrelid = HeapTupleGetOid(tup);
309 : }
310 :
311 : /* Cleanup. */
312 0 : heap_close(rel, NoLock);
313 :
314 0 : return subrelid;
315 : }
316 :
317 : /*
318 : * Get state of subscription table.
319 : *
320 : * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
321 : */
322 : char
323 0 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
324 : bool missing_ok)
325 : {
326 : Relation rel;
327 : HeapTuple tup;
328 : char substate;
329 : bool isnull;
330 : Datum d;
331 :
332 0 : rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
333 :
334 : /* Try finding the mapping. */
335 0 : tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
336 : ObjectIdGetDatum(relid),
337 : ObjectIdGetDatum(subid));
338 :
339 0 : if (!HeapTupleIsValid(tup))
340 : {
341 0 : if (missing_ok)
342 : {
343 0 : heap_close(rel, AccessShareLock);
344 0 : *sublsn = InvalidXLogRecPtr;
345 0 : return SUBREL_STATE_UNKNOWN;
346 : }
347 :
348 0 : elog(ERROR, "subscription table %u in subscription %u does not exist",
349 : relid, subid);
350 : }
351 :
352 : /* Get the state. */
353 0 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
354 : Anum_pg_subscription_rel_srsubstate, &isnull);
355 0 : Assert(!isnull);
356 0 : substate = DatumGetChar(d);
357 0 : d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
358 : Anum_pg_subscription_rel_srsublsn, &isnull);
359 0 : if (isnull)
360 0 : *sublsn = InvalidXLogRecPtr;
361 : else
362 0 : *sublsn = DatumGetLSN(d);
363 :
364 : /* Cleanup */
365 0 : ReleaseSysCache(tup);
366 0 : heap_close(rel, AccessShareLock);
367 :
368 0 : return substate;
369 : }
370 :
371 : /*
372 : * Drop subscription relation mapping. These can be for a particular
373 : * subscription, or for a particular relation, or both.
374 : */
375 : void
376 2144 : RemoveSubscriptionRel(Oid subid, Oid relid)
377 : {
378 : Relation rel;
379 : HeapScanDesc scan;
380 : ScanKeyData skey[2];
381 : HeapTuple tup;
382 2144 : int nkeys = 0;
383 :
384 2144 : rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
385 :
386 2144 : if (OidIsValid(subid))
387 : {
388 3 : ScanKeyInit(&skey[nkeys++],
389 : Anum_pg_subscription_rel_srsubid,
390 : BTEqualStrategyNumber,
391 : F_OIDEQ,
392 : ObjectIdGetDatum(subid));
393 : }
394 :
395 2144 : if (OidIsValid(relid))
396 : {
397 2141 : ScanKeyInit(&skey[nkeys++],
398 : Anum_pg_subscription_rel_srrelid,
399 : BTEqualStrategyNumber,
400 : F_OIDEQ,
401 : ObjectIdGetDatum(relid));
402 : }
403 :
404 : /* Do the search and delete what we found. */
405 2144 : scan = heap_beginscan_catalog(rel, nkeys, skey);
406 4288 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
407 : {
408 0 : CatalogTupleDelete(rel, &tup->t_self);
409 : }
410 2144 : heap_endscan(scan);
411 :
412 2144 : heap_close(rel, RowExclusiveLock);
413 2144 : }
414 :
415 :
416 : /*
417 : * Get all relations for subscription.
418 : *
419 : * Returned list is palloc'ed in current memory context.
420 : */
421 : List *
422 0 : GetSubscriptionRelations(Oid subid)
423 : {
424 0 : List *res = NIL;
425 : Relation rel;
426 : HeapTuple tup;
427 0 : int nkeys = 0;
428 : ScanKeyData skey[2];
429 : SysScanDesc scan;
430 :
431 0 : rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
432 :
433 0 : ScanKeyInit(&skey[nkeys++],
434 : Anum_pg_subscription_rel_srsubid,
435 : BTEqualStrategyNumber, F_OIDEQ,
436 : ObjectIdGetDatum(subid));
437 :
438 0 : scan = systable_beginscan(rel, InvalidOid, false,
439 : NULL, nkeys, skey);
440 :
441 0 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
442 : {
443 : Form_pg_subscription_rel subrel;
444 : SubscriptionRelState *relstate;
445 :
446 0 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
447 :
448 0 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
449 0 : relstate->relid = subrel->srrelid;
450 0 : relstate->state = subrel->srsubstate;
451 0 : relstate->lsn = subrel->srsublsn;
452 :
453 0 : res = lappend(res, relstate);
454 : }
455 :
456 : /* Cleanup */
457 0 : systable_endscan(scan);
458 0 : heap_close(rel, AccessShareLock);
459 :
460 0 : return res;
461 : }
462 :
463 : /*
464 : * Get all relations for subscription that are not in a ready state.
465 : *
466 : * Returned list is palloc'ed in current memory context.
467 : */
468 : List *
469 0 : GetSubscriptionNotReadyRelations(Oid subid)
470 : {
471 0 : List *res = NIL;
472 : Relation rel;
473 : HeapTuple tup;
474 0 : int nkeys = 0;
475 : ScanKeyData skey[2];
476 : SysScanDesc scan;
477 :
478 0 : rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
479 :
480 0 : ScanKeyInit(&skey[nkeys++],
481 : Anum_pg_subscription_rel_srsubid,
482 : BTEqualStrategyNumber, F_OIDEQ,
483 : ObjectIdGetDatum(subid));
484 :
485 0 : ScanKeyInit(&skey[nkeys++],
486 : Anum_pg_subscription_rel_srsubstate,
487 : BTEqualStrategyNumber, F_CHARNE,
488 : CharGetDatum(SUBREL_STATE_READY));
489 :
490 0 : scan = systable_beginscan(rel, InvalidOid, false,
491 : NULL, nkeys, skey);
492 :
493 0 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
494 : {
495 : Form_pg_subscription_rel subrel;
496 : SubscriptionRelState *relstate;
497 :
498 0 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
499 :
500 0 : relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
501 0 : relstate->relid = subrel->srrelid;
502 0 : relstate->state = subrel->srsubstate;
503 0 : relstate->lsn = subrel->srsublsn;
504 :
505 0 : res = lappend(res, relstate);
506 : }
507 :
508 : /* Cleanup */
509 0 : systable_endscan(scan);
510 0 : heap_close(rel, AccessShareLock);
511 :
512 0 : return res;
513 : }
|