Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * relation.c
3 : * PostgreSQL logical replication
4 : *
5 : * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/relation.c
9 : *
10 : * NOTES
11 : * This file contains helper functions for logical replication relation
12 : * mapping cache.
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 :
17 : #include "postgres.h"
18 :
19 : #include "access/heapam.h"
20 : #include "access/sysattr.h"
21 : #include "catalog/namespace.h"
22 : #include "catalog/pg_subscription_rel.h"
23 : #include "executor/executor.h"
24 : #include "nodes/makefuncs.h"
25 : #include "replication/logicalrelation.h"
26 : #include "replication/worker_internal.h"
27 : #include "utils/builtins.h"
28 : #include "utils/inval.h"
29 : #include "utils/lsyscache.h"
30 : #include "utils/memutils.h"
31 : #include "utils/syscache.h"
32 :
33 : static MemoryContext LogicalRepRelMapContext = NULL;
34 :
35 : static HTAB *LogicalRepRelMap = NULL;
36 : static HTAB *LogicalRepTypMap = NULL;
37 :
38 : static void logicalrep_typmap_invalidate_cb(Datum arg, int cacheid,
39 : uint32 hashvalue);
40 :
41 : /*
42 : * Relcache invalidation callback for our relation map cache.
43 : */
44 : static void
45 0 : logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
46 : {
47 : LogicalRepRelMapEntry *entry;
48 :
49 : /* Just to be sure. */
50 0 : if (LogicalRepRelMap == NULL)
51 0 : return;
52 :
53 0 : if (reloid != InvalidOid)
54 : {
55 : HASH_SEQ_STATUS status;
56 :
57 0 : hash_seq_init(&status, LogicalRepRelMap);
58 :
59 : /* TODO, use inverse lookup hashtable? */
60 0 : while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
61 : {
62 0 : if (entry->localreloid == reloid)
63 : {
64 0 : entry->localreloid = InvalidOid;
65 0 : hash_seq_term(&status);
66 0 : break;
67 : }
68 : }
69 : }
70 : else
71 : {
72 : /* invalidate all cache entries */
73 : HASH_SEQ_STATUS status;
74 :
75 0 : hash_seq_init(&status, LogicalRepRelMap);
76 :
77 0 : while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
78 0 : entry->localreloid = InvalidOid;
79 : }
80 : }
81 :
82 : /*
83 : * Initialize the relation map cache.
84 : */
85 : static void
86 0 : logicalrep_relmap_init(void)
87 : {
88 : HASHCTL ctl;
89 :
90 0 : if (!LogicalRepRelMapContext)
91 0 : LogicalRepRelMapContext =
92 0 : AllocSetContextCreate(CacheMemoryContext,
93 : "LogicalRepRelMapContext",
94 : ALLOCSET_DEFAULT_SIZES);
95 :
96 : /* Initialize the relation hash table. */
97 0 : MemSet(&ctl, 0, sizeof(ctl));
98 0 : ctl.keysize = sizeof(LogicalRepRelId);
99 0 : ctl.entrysize = sizeof(LogicalRepRelMapEntry);
100 0 : ctl.hcxt = LogicalRepRelMapContext;
101 :
102 0 : LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
103 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
104 :
105 : /* Initialize the type hash table. */
106 0 : MemSet(&ctl, 0, sizeof(ctl));
107 0 : ctl.keysize = sizeof(Oid);
108 0 : ctl.entrysize = sizeof(LogicalRepTyp);
109 0 : ctl.hcxt = LogicalRepRelMapContext;
110 :
111 : /* This will usually be small. */
112 0 : LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
113 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
114 :
115 : /* Watch for invalidation events. */
116 0 : CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
117 : (Datum) 0);
118 0 : CacheRegisterSyscacheCallback(TYPEOID, logicalrep_typmap_invalidate_cb,
119 : (Datum) 0);
120 0 : }
121 :
122 : /*
123 : * Free the entry of a relation map cache.
124 : */
125 : static void
126 0 : logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
127 : {
128 : LogicalRepRelation *remoterel;
129 :
130 0 : remoterel = &entry->remoterel;
131 :
132 0 : pfree(remoterel->nspname);
133 0 : pfree(remoterel->relname);
134 :
135 0 : if (remoterel->natts > 0)
136 : {
137 : int i;
138 :
139 0 : for (i = 0; i < remoterel->natts; i++)
140 0 : pfree(remoterel->attnames[i]);
141 :
142 0 : pfree(remoterel->attnames);
143 0 : pfree(remoterel->atttyps);
144 : }
145 0 : bms_free(remoterel->attkeys);
146 :
147 0 : if (entry->attrmap)
148 0 : pfree(entry->attrmap);
149 0 : }
150 :
151 : /*
152 : * Add new entry or update existing entry in the relation map cache.
153 : *
154 : * Called when new relation mapping is sent by the publisher to update
155 : * our expected view of incoming data from said publisher.
156 : */
157 : void
158 0 : logicalrep_relmap_update(LogicalRepRelation *remoterel)
159 : {
160 : MemoryContext oldctx;
161 : LogicalRepRelMapEntry *entry;
162 : bool found;
163 : int i;
164 :
165 0 : if (LogicalRepRelMap == NULL)
166 0 : logicalrep_relmap_init();
167 :
168 : /*
169 : * HASH_ENTER returns the existing entry if present or creates a new one.
170 : */
171 0 : entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
172 : HASH_ENTER, &found);
173 :
174 0 : if (found)
175 0 : logicalrep_relmap_free_entry(entry);
176 :
177 0 : memset(entry, 0, sizeof(LogicalRepRelMapEntry));
178 :
179 : /* Make cached copy of the data */
180 0 : oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
181 0 : entry->remoterel.remoteid = remoterel->remoteid;
182 0 : entry->remoterel.nspname = pstrdup(remoterel->nspname);
183 0 : entry->remoterel.relname = pstrdup(remoterel->relname);
184 0 : entry->remoterel.natts = remoterel->natts;
185 0 : entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
186 0 : entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
187 0 : for (i = 0; i < remoterel->natts; i++)
188 : {
189 0 : entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
190 0 : entry->remoterel.atttyps[i] = remoterel->atttyps[i];
191 : }
192 0 : entry->remoterel.replident = remoterel->replident;
193 0 : entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
194 0 : MemoryContextSwitchTo(oldctx);
195 0 : }
196 :
197 : /*
198 : * Find attribute index in TupleDesc struct by attribute name.
199 : *
200 : * Returns -1 if not found.
201 : */
202 : static int
203 0 : logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
204 : {
205 : int i;
206 :
207 0 : for (i = 0; i < remoterel->natts; i++)
208 : {
209 0 : if (strcmp(remoterel->attnames[i], attname) == 0)
210 0 : return i;
211 : }
212 :
213 0 : return -1;
214 : }
215 :
216 : /*
217 : * Open the local relation associated with the remote one.
218 : *
219 : * Optionally rebuilds the Relcache mapping if it was invalidated
220 : * by local DDL.
221 : */
222 : LogicalRepRelMapEntry *
223 0 : logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
224 : {
225 : LogicalRepRelMapEntry *entry;
226 : bool found;
227 :
228 0 : if (LogicalRepRelMap == NULL)
229 0 : logicalrep_relmap_init();
230 :
231 : /* Search for existing entry. */
232 0 : entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
233 : HASH_FIND, &found);
234 :
235 0 : if (!found)
236 0 : elog(ERROR, "no relation map entry for remote relation ID %u",
237 : remoteid);
238 :
239 : /* Need to update the local cache? */
240 0 : if (!OidIsValid(entry->localreloid))
241 : {
242 : Oid relid;
243 : int i;
244 : int found;
245 : Bitmapset *idkey;
246 : TupleDesc desc;
247 : LogicalRepRelation *remoterel;
248 : MemoryContext oldctx;
249 :
250 0 : remoterel = &entry->remoterel;
251 :
252 : /* Try to find and lock the relation by name. */
253 0 : relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
254 : remoterel->relname, -1),
255 : lockmode, true);
256 0 : if (!OidIsValid(relid))
257 0 : ereport(ERROR,
258 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
259 : errmsg("logical replication target relation \"%s.%s\" does not exist",
260 : remoterel->nspname, remoterel->relname)));
261 0 : entry->localrel = heap_open(relid, NoLock);
262 :
263 : /* Check for supported relkind. */
264 0 : CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
265 0 : remoterel->nspname, remoterel->relname);
266 :
267 : /*
268 : * Build the mapping of local attribute numbers to remote attribute
269 : * numbers and validate that we don't miss any replicated columns as
270 : * that would result in potentially unwanted data loss.
271 : */
272 0 : desc = RelationGetDescr(entry->localrel);
273 0 : oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
274 0 : entry->attrmap = palloc(desc->natts * sizeof(int));
275 0 : MemoryContextSwitchTo(oldctx);
276 :
277 0 : found = 0;
278 0 : for (i = 0; i < desc->natts; i++)
279 : {
280 : int attnum;
281 0 : Form_pg_attribute attr = TupleDescAttr(desc, i);
282 :
283 0 : if (attr->attisdropped)
284 : {
285 0 : entry->attrmap[i] = -1;
286 0 : continue;
287 : }
288 :
289 0 : attnum = logicalrep_rel_att_by_name(remoterel,
290 0 : NameStr(attr->attname));
291 :
292 0 : entry->attrmap[i] = attnum;
293 0 : if (attnum >= 0)
294 0 : found++;
295 : }
296 :
297 : /* TODO, detail message with names of missing columns */
298 0 : if (found < remoterel->natts)
299 0 : ereport(ERROR,
300 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
301 : errmsg("logical replication target relation \"%s.%s\" is missing "
302 : "some replicated columns",
303 : remoterel->nspname, remoterel->relname)));
304 :
305 : /*
306 : * Check that replica identity matches. We allow for stricter replica
307 : * identity (fewer columns) on subscriber as that will not stop us
308 : * from finding unique tuple. IE, if publisher has identity
309 : * (id,timestamp) and subscriber just (id) this will not be a problem,
310 : * but in the opposite scenario it will.
311 : *
312 : * Don't throw any error here just mark the relation entry as not
313 : * updatable, as replica identity is only for updates and deletes but
314 : * inserts can be replicated even without it.
315 : */
316 0 : entry->updatable = true;
317 0 : idkey = RelationGetIndexAttrBitmap(entry->localrel,
318 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
319 : /* fallback to PK if no replica identity */
320 0 : if (idkey == NULL)
321 : {
322 0 : idkey = RelationGetIndexAttrBitmap(entry->localrel,
323 : INDEX_ATTR_BITMAP_PRIMARY_KEY);
324 :
325 : /*
326 : * If no replica identity index and no PK, the published table
327 : * must have replica identity FULL.
328 : */
329 0 : if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
330 0 : entry->updatable = false;
331 : }
332 :
333 0 : i = -1;
334 0 : while ((i = bms_next_member(idkey, i)) >= 0)
335 : {
336 0 : int attnum = i + FirstLowInvalidHeapAttributeNumber;
337 :
338 0 : if (!AttrNumberIsForUserDefinedAttr(attnum))
339 0 : ereport(ERROR,
340 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
341 : errmsg("logical replication target relation \"%s.%s\" uses "
342 : "system columns in REPLICA IDENTITY index",
343 : remoterel->nspname, remoterel->relname)));
344 :
345 0 : attnum = AttrNumberGetAttrOffset(attnum);
346 :
347 0 : if (!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
348 : {
349 0 : entry->updatable = false;
350 0 : break;
351 : }
352 : }
353 :
354 0 : entry->localreloid = relid;
355 : }
356 : else
357 0 : entry->localrel = heap_open(entry->localreloid, lockmode);
358 :
359 0 : if (entry->state != SUBREL_STATE_READY)
360 0 : entry->state = GetSubscriptionRelState(MySubscription->oid,
361 : entry->localreloid,
362 : &entry->statelsn,
363 : true);
364 :
365 0 : return entry;
366 : }
367 :
368 : /*
369 : * Close the previously opened logical relation.
370 : */
371 : void
372 0 : logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
373 : {
374 0 : heap_close(rel->localrel, lockmode);
375 0 : rel->localrel = NULL;
376 0 : }
377 :
378 :
379 : /*
380 : * Type cache invalidation callback for our type map cache.
381 : */
382 : static void
383 0 : logicalrep_typmap_invalidate_cb(Datum arg, int cacheid, uint32 hashvalue)
384 : {
385 : HASH_SEQ_STATUS status;
386 : LogicalRepTyp *entry;
387 :
388 : /* Just to be sure. */
389 0 : if (LogicalRepTypMap == NULL)
390 0 : return;
391 :
392 : /* invalidate all cache entries */
393 0 : hash_seq_init(&status, LogicalRepTypMap);
394 :
395 0 : while ((entry = (LogicalRepTyp *) hash_seq_search(&status)) != NULL)
396 0 : entry->typoid = InvalidOid;
397 : }
398 :
399 : /*
400 : * Free the type map cache entry data.
401 : */
402 : static void
403 0 : logicalrep_typmap_free_entry(LogicalRepTyp *entry)
404 : {
405 0 : pfree(entry->nspname);
406 0 : pfree(entry->typname);
407 :
408 0 : entry->typoid = InvalidOid;
409 0 : }
410 :
411 : /*
412 : * Add new entry or update existing entry in the type map cache.
413 : */
414 : void
415 0 : logicalrep_typmap_update(LogicalRepTyp *remotetyp)
416 : {
417 : MemoryContext oldctx;
418 : LogicalRepTyp *entry;
419 : bool found;
420 :
421 0 : if (LogicalRepTypMap == NULL)
422 0 : logicalrep_relmap_init();
423 :
424 : /*
425 : * HASH_ENTER returns the existing entry if present or creates a new one.
426 : */
427 0 : entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
428 : HASH_ENTER, &found);
429 :
430 0 : if (found)
431 0 : logicalrep_typmap_free_entry(entry);
432 :
433 : /* Make cached copy of the data */
434 0 : entry->remoteid = remotetyp->remoteid;
435 0 : oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
436 0 : entry->nspname = pstrdup(remotetyp->nspname);
437 0 : entry->typname = pstrdup(remotetyp->typname);
438 0 : MemoryContextSwitchTo(oldctx);
439 0 : entry->typoid = InvalidOid;
440 0 : }
441 :
442 : /*
443 : * Fetch type info from the cache.
444 : */
445 : Oid
446 0 : logicalrep_typmap_getid(Oid remoteid)
447 : {
448 : LogicalRepTyp *entry;
449 : bool found;
450 : Oid nspoid;
451 :
452 : /* Internal types are mapped directly. */
453 0 : if (remoteid < FirstNormalObjectId)
454 : {
455 0 : if (!get_typisdefined(remoteid))
456 0 : ereport(ERROR,
457 : (errmsg("builtin type %u not found", remoteid),
458 : errhint("This can be caused by having publisher with "
459 : "higher major version than subscriber")));
460 0 : return remoteid;
461 : }
462 :
463 0 : if (LogicalRepTypMap == NULL)
464 0 : logicalrep_relmap_init();
465 :
466 : /* Try finding the mapping. */
467 0 : entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
468 : HASH_FIND, &found);
469 :
470 0 : if (!found)
471 0 : elog(ERROR, "no type map entry for remote type %u",
472 : remoteid);
473 :
474 : /* Found and mapped, return the oid. */
475 0 : if (OidIsValid(entry->typoid))
476 0 : return entry->typoid;
477 :
478 : /* Otherwise, try to map to local type. */
479 0 : nspoid = LookupExplicitNamespace(entry->nspname, true);
480 0 : if (OidIsValid(nspoid))
481 0 : entry->typoid = GetSysCacheOid2(TYPENAMENSP,
482 : PointerGetDatum(entry->typname),
483 : ObjectIdGetDatum(nspoid));
484 : else
485 0 : entry->typoid = InvalidOid;
486 :
487 0 : if (!OidIsValid(entry->typoid))
488 0 : ereport(ERROR,
489 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
490 : errmsg("data type \"%s.%s\" required for logical replication does not exist",
491 : entry->nspname, entry->typname)));
492 :
493 0 : return entry->typoid;
494 : }
|