Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/relscan.h"
18 : #include "access/transam.h"
19 : #include "access/xact.h"
20 : #include "commands/trigger.h"
21 : #include "executor/executor.h"
22 : #include "nodes/nodeFuncs.h"
23 : #include "parser/parse_relation.h"
24 : #include "parser/parsetree.h"
25 : #include "storage/bufmgr.h"
26 : #include "storage/lmgr.h"
27 : #include "utils/builtins.h"
28 : #include "utils/datum.h"
29 : #include "utils/lsyscache.h"
30 : #include "utils/memutils.h"
31 : #include "utils/rel.h"
32 : #include "utils/snapmgr.h"
33 : #include "utils/syscache.h"
34 : #include "utils/typcache.h"
35 : #include "utils/tqual.h"
36 :
37 :
38 : /*
39 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
40 : * is setup to match 'rel' (*NOT* idxrel!).
41 : *
42 : * Returns whether any column contains NULLs.
43 : *
44 : * This is not generic routine, it expects the idxrel to be replication
45 : * identity of a rel and meet all limitations associated with that.
46 : */
47 : static bool
48 0 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
49 : TupleTableSlot *searchslot)
50 : {
51 : int attoff;
52 : bool isnull;
53 : Datum indclassDatum;
54 : oidvector *opclass;
55 0 : int2vector *indkey = &idxrel->rd_index->indkey;
56 0 : bool hasnulls = false;
57 :
58 0 : Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel));
59 :
60 0 : indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
61 : Anum_pg_index_indclass, &isnull);
62 0 : Assert(!isnull);
63 0 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
64 :
65 : /* Build scankey for every attribute in the index. */
66 0 : for (attoff = 0; attoff < RelationGetNumberOfAttributes(idxrel); attoff++)
67 : {
68 : Oid operator;
69 : Oid opfamily;
70 : RegProcedure regop;
71 0 : int pkattno = attoff + 1;
72 0 : int mainattno = indkey->values[attoff];
73 0 : Oid optype = get_opclass_input_type(opclass->values[attoff]);
74 :
75 : /*
76 : * Load the operator info. We need this to get the equality operator
77 : * function for the scan key.
78 : */
79 0 : opfamily = get_opclass_family(opclass->values[attoff]);
80 :
81 0 : operator = get_opfamily_member(opfamily, optype,
82 : optype,
83 : BTEqualStrategyNumber);
84 0 : if (!OidIsValid(operator))
85 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
86 : BTEqualStrategyNumber, optype, optype, opfamily);
87 :
88 0 : regop = get_opcode(operator);
89 :
90 : /* Initialize the scankey. */
91 0 : ScanKeyInit(&skey[attoff],
92 : pkattno,
93 : BTEqualStrategyNumber,
94 : regop,
95 0 : searchslot->tts_values[mainattno - 1]);
96 :
97 : /* Check for null value. */
98 0 : if (searchslot->tts_isnull[mainattno - 1])
99 : {
100 0 : hasnulls = true;
101 0 : skey[attoff].sk_flags |= SK_ISNULL;
102 : }
103 : }
104 :
105 0 : return hasnulls;
106 : }
107 :
108 : /*
109 : * Search the relation 'rel' for tuple using the index.
110 : *
111 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
112 : * contents, and return true. Return false otherwise.
113 : */
114 : bool
115 0 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
116 : LockTupleMode lockmode,
117 : TupleTableSlot *searchslot,
118 : TupleTableSlot *outslot)
119 : {
120 : HeapTuple scantuple;
121 : ScanKeyData skey[INDEX_MAX_KEYS];
122 : IndexScanDesc scan;
123 : SnapshotData snap;
124 : TransactionId xwait;
125 : Relation idxrel;
126 : bool found;
127 :
128 : /* Open the index. */
129 0 : idxrel = index_open(idxoid, RowExclusiveLock);
130 :
131 : /* Start an index scan. */
132 0 : InitDirtySnapshot(snap);
133 0 : scan = index_beginscan(rel, idxrel, &snap,
134 0 : RelationGetNumberOfAttributes(idxrel),
135 : 0);
136 :
137 : /* Build scan key. */
138 0 : build_replindex_scan_key(skey, rel, idxrel, searchslot);
139 :
140 : retry:
141 0 : found = false;
142 :
143 0 : index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0);
144 :
145 : /* Try to find the tuple */
146 0 : if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
147 : {
148 0 : found = true;
149 0 : ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
150 0 : ExecMaterializeSlot(outslot);
151 :
152 0 : xwait = TransactionIdIsValid(snap.xmin) ?
153 0 : snap.xmin : snap.xmax;
154 :
155 : /*
156 : * If the tuple is locked, wait for locking transaction to finish and
157 : * retry.
158 : */
159 0 : if (TransactionIdIsValid(xwait))
160 : {
161 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
162 0 : goto retry;
163 : }
164 : }
165 :
166 : /* Found tuple, try to lock it in the lockmode. */
167 0 : if (found)
168 : {
169 : Buffer buf;
170 : HeapUpdateFailureData hufd;
171 : HTSU_Result res;
172 : HeapTupleData locktup;
173 :
174 0 : ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
175 :
176 0 : PushActiveSnapshot(GetLatestSnapshot());
177 :
178 0 : res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
179 : lockmode,
180 : LockWaitBlock,
181 : false /* don't follow updates */ ,
182 : &buf, &hufd);
183 : /* the tuple slot already has the buffer pinned */
184 0 : ReleaseBuffer(buf);
185 :
186 0 : PopActiveSnapshot();
187 :
188 0 : switch (res)
189 : {
190 : case HeapTupleMayBeUpdated:
191 0 : break;
192 : case HeapTupleUpdated:
193 : /* XXX: Improve handling here */
194 0 : ereport(LOG,
195 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
196 : errmsg("concurrent update, retrying")));
197 0 : goto retry;
198 : case HeapTupleInvisible:
199 0 : elog(ERROR, "attempted to lock invisible tuple");
200 : default:
201 0 : elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
202 : break;
203 : }
204 : }
205 :
206 0 : index_endscan(scan);
207 :
208 : /* Don't release lock until commit. */
209 0 : index_close(idxrel, NoLock);
210 :
211 0 : return found;
212 : }
213 :
214 : /*
215 : * Compare the tuple and slot and check if they have equal values.
216 : *
217 : * We use binary datum comparison which might return false negatives but
218 : * that's the best we can do here as there may be multiple notions of
219 : * equality for the data types and table columns don't specify which one
220 : * to use.
221 : */
222 : static bool
223 0 : tuple_equals_slot(TupleDesc desc, HeapTuple tup, TupleTableSlot *slot)
224 : {
225 : Datum values[MaxTupleAttributeNumber];
226 : bool isnull[MaxTupleAttributeNumber];
227 : int attrnum;
228 :
229 0 : heap_deform_tuple(tup, desc, values, isnull);
230 :
231 : /* Check equality of the attributes. */
232 0 : for (attrnum = 0; attrnum < desc->natts; attrnum++)
233 : {
234 : Form_pg_attribute att;
235 : TypeCacheEntry *typentry;
236 :
237 : /*
238 : * If one value is NULL and other is not, then they are certainly not
239 : * equal
240 : */
241 0 : if (isnull[attrnum] != slot->tts_isnull[attrnum])
242 0 : return false;
243 :
244 : /*
245 : * If both are NULL, they can be considered equal.
246 : */
247 0 : if (isnull[attrnum])
248 0 : continue;
249 :
250 0 : att = TupleDescAttr(desc, attrnum);
251 :
252 0 : typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
253 0 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
254 0 : ereport(ERROR,
255 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
256 : errmsg("could not identify an equality operator for type %s",
257 : format_type_be(att->atttypid))));
258 :
259 0 : if (!DatumGetBool(FunctionCall2(&typentry->eq_opr_finfo,
260 : values[attrnum],
261 : slot->tts_values[attrnum])))
262 0 : return false;
263 : }
264 :
265 0 : return true;
266 : }
267 :
268 : /*
269 : * Search the relation 'rel' for tuple using the sequential scan.
270 : *
271 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
272 : * contents, and return true. Return false otherwise.
273 : *
274 : * Note that this stops on the first matching tuple.
275 : *
276 : * This can obviously be quite slow on tables that have more than few rows.
277 : */
278 : bool
279 0 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
280 : TupleTableSlot *searchslot, TupleTableSlot *outslot)
281 : {
282 : HeapTuple scantuple;
283 : HeapScanDesc scan;
284 : SnapshotData snap;
285 : TransactionId xwait;
286 : bool found;
287 0 : TupleDesc desc = RelationGetDescr(rel);
288 :
289 0 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
290 :
291 : /* Start an index scan. */
292 0 : InitDirtySnapshot(snap);
293 0 : scan = heap_beginscan(rel, &snap, 0, NULL);
294 :
295 : retry:
296 0 : found = false;
297 :
298 0 : heap_rescan(scan, NULL);
299 :
300 : /* Try to find the tuple */
301 0 : while ((scantuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
302 : {
303 0 : if (!tuple_equals_slot(desc, scantuple, searchslot))
304 0 : continue;
305 :
306 0 : found = true;
307 0 : ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
308 0 : ExecMaterializeSlot(outslot);
309 :
310 0 : xwait = TransactionIdIsValid(snap.xmin) ?
311 0 : snap.xmin : snap.xmax;
312 :
313 : /*
314 : * If the tuple is locked, wait for locking transaction to finish and
315 : * retry.
316 : */
317 0 : if (TransactionIdIsValid(xwait))
318 : {
319 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
320 0 : goto retry;
321 : }
322 : }
323 :
324 : /* Found tuple, try to lock it in the lockmode. */
325 0 : if (found)
326 : {
327 : Buffer buf;
328 : HeapUpdateFailureData hufd;
329 : HTSU_Result res;
330 : HeapTupleData locktup;
331 :
332 0 : ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
333 :
334 0 : PushActiveSnapshot(GetLatestSnapshot());
335 :
336 0 : res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
337 : lockmode,
338 : LockWaitBlock,
339 : false /* don't follow updates */ ,
340 : &buf, &hufd);
341 : /* the tuple slot already has the buffer pinned */
342 0 : ReleaseBuffer(buf);
343 :
344 0 : PopActiveSnapshot();
345 :
346 0 : switch (res)
347 : {
348 : case HeapTupleMayBeUpdated:
349 0 : break;
350 : case HeapTupleUpdated:
351 : /* XXX: Improve handling here */
352 0 : ereport(LOG,
353 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
354 : errmsg("concurrent update, retrying")));
355 0 : goto retry;
356 : case HeapTupleInvisible:
357 0 : elog(ERROR, "attempted to lock invisible tuple");
358 : default:
359 0 : elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
360 : break;
361 : }
362 : }
363 :
364 0 : heap_endscan(scan);
365 :
366 0 : return found;
367 : }
368 :
369 : /*
370 : * Insert tuple represented in the slot to the relation, update the indexes,
371 : * and execute any constraints and per-row triggers.
372 : *
373 : * Caller is responsible for opening the indexes.
374 : */
375 : void
376 0 : ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
377 : {
378 0 : bool skip_tuple = false;
379 : HeapTuple tuple;
380 0 : ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
381 0 : Relation rel = resultRelInfo->ri_RelationDesc;
382 :
383 : /* For now we support only tables. */
384 0 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
385 :
386 0 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
387 :
388 : /* BEFORE ROW INSERT Triggers */
389 0 : if (resultRelInfo->ri_TrigDesc &&
390 0 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
391 : {
392 0 : slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
393 :
394 0 : if (slot == NULL) /* "do nothing" */
395 0 : skip_tuple = true;
396 : }
397 :
398 0 : if (!skip_tuple)
399 : {
400 0 : List *recheckIndexes = NIL;
401 :
402 : /* Check the constraints of the tuple */
403 0 : if (rel->rd_att->constr)
404 0 : ExecConstraints(resultRelInfo, slot, estate);
405 :
406 : /* Store the slot into tuple that we can inspect. */
407 0 : tuple = ExecMaterializeSlot(slot);
408 :
409 : /* OK, store the tuple and create index entries for it */
410 0 : simple_heap_insert(rel, tuple);
411 :
412 0 : if (resultRelInfo->ri_NumIndices > 0)
413 0 : recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
414 : estate, false, NULL,
415 : NIL);
416 :
417 : /* AFTER ROW INSERT Triggers */
418 0 : ExecARInsertTriggers(estate, resultRelInfo, tuple,
419 : recheckIndexes, NULL);
420 :
421 : /*
422 : * XXX we should in theory pass a TransitionCaptureState object to the
423 : * above to capture transition tuples, but after statement triggers
424 : * don't actually get fired by replication yet anyway
425 : */
426 :
427 0 : list_free(recheckIndexes);
428 : }
429 0 : }
430 :
431 : /*
432 : * Find the searchslot tuple and update it with data in the slot,
433 : * update the indexes, and execute any constraints and per-row triggers.
434 : *
435 : * Caller is responsible for opening the indexes.
436 : */
437 : void
438 0 : ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
439 : TupleTableSlot *searchslot, TupleTableSlot *slot)
440 : {
441 0 : bool skip_tuple = false;
442 : HeapTuple tuple;
443 0 : ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
444 0 : Relation rel = resultRelInfo->ri_RelationDesc;
445 :
446 : /* For now we support only tables. */
447 0 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
448 :
449 0 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
450 :
451 : /* BEFORE ROW INSERT Triggers */
452 0 : if (resultRelInfo->ri_TrigDesc &&
453 0 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
454 : {
455 0 : slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
456 0 : &searchslot->tts_tuple->t_self,
457 : NULL, slot);
458 :
459 0 : if (slot == NULL) /* "do nothing" */
460 0 : skip_tuple = true;
461 : }
462 :
463 0 : if (!skip_tuple)
464 : {
465 0 : List *recheckIndexes = NIL;
466 :
467 : /* Check the constraints of the tuple */
468 0 : if (rel->rd_att->constr)
469 0 : ExecConstraints(resultRelInfo, slot, estate);
470 :
471 : /* Store the slot into tuple that we can write. */
472 0 : tuple = ExecMaterializeSlot(slot);
473 :
474 : /* OK, update the tuple and index entries for it */
475 0 : simple_heap_update(rel, &searchslot->tts_tuple->t_self,
476 : slot->tts_tuple);
477 :
478 0 : if (resultRelInfo->ri_NumIndices > 0 &&
479 0 : !HeapTupleIsHeapOnly(slot->tts_tuple))
480 0 : recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
481 : estate, false, NULL,
482 : NIL);
483 :
484 : /* AFTER ROW UPDATE Triggers */
485 0 : ExecARUpdateTriggers(estate, resultRelInfo,
486 0 : &searchslot->tts_tuple->t_self,
487 : NULL, tuple, recheckIndexes, NULL);
488 :
489 0 : list_free(recheckIndexes);
490 : }
491 0 : }
492 :
493 : /*
494 : * Find the searchslot tuple and delete it, and execute any constraints
495 : * and per-row triggers.
496 : *
497 : * Caller is responsible for opening the indexes.
498 : */
499 : void
500 0 : ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
501 : TupleTableSlot *searchslot)
502 : {
503 0 : bool skip_tuple = false;
504 0 : ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
505 0 : Relation rel = resultRelInfo->ri_RelationDesc;
506 :
507 : /* For now we support only tables. */
508 0 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
509 :
510 0 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
511 :
512 : /* BEFORE ROW INSERT Triggers */
513 0 : if (resultRelInfo->ri_TrigDesc &&
514 0 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
515 : {
516 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
517 0 : &searchslot->tts_tuple->t_self,
518 : NULL);
519 : }
520 :
521 0 : if (!skip_tuple)
522 : {
523 0 : List *recheckIndexes = NIL;
524 :
525 : /* OK, delete the tuple */
526 0 : simple_heap_delete(rel, &searchslot->tts_tuple->t_self);
527 :
528 : /* AFTER ROW DELETE Triggers */
529 0 : ExecARDeleteTriggers(estate, resultRelInfo,
530 0 : &searchslot->tts_tuple->t_self, NULL, NULL);
531 :
532 0 : list_free(recheckIndexes);
533 : }
534 0 : }
535 :
536 : /*
537 : * Check if command can be executed with current replica identity.
538 : */
539 : void
540 4777 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
541 : {
542 : PublicationActions *pubactions;
543 :
544 : /* We only need to do checks for UPDATE and DELETE. */
545 4777 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
546 3769 : return;
547 :
548 : /* If relation has replica identity we are always good. */
549 2016 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
550 1008 : OidIsValid(RelationGetReplicaIndex(rel)))
551 189 : return;
552 :
553 : /*
554 : * This is either UPDATE OR DELETE and there is no replica identity.
555 : *
556 : * Check if the table publishes UPDATES or DELETES.
557 : */
558 819 : pubactions = GetRelationPublicationActions(rel);
559 819 : if (cmd == CMD_UPDATE && pubactions->pubupdate)
560 0 : ereport(ERROR,
561 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
562 : errmsg("cannot update table \"%s\" because it does not have replica identity and publishes updates",
563 : RelationGetRelationName(rel)),
564 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
565 819 : else if (cmd == CMD_DELETE && pubactions->pubdelete)
566 0 : ereport(ERROR,
567 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
568 : errmsg("cannot delete from table \"%s\" because it does not have replica identity and publishes deletes",
569 : RelationGetRelationName(rel)),
570 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
571 : }
572 :
573 :
574 : /*
575 : * Check if we support writing into specific relkind.
576 : *
577 : * The nspname and relname are only needed for error reporting.
578 : */
579 : void
580 0 : CheckSubscriptionRelkind(char relkind, const char *nspname,
581 : const char *relname)
582 : {
583 : /*
584 : * We currently only support writing to regular tables.
585 : */
586 0 : if (relkind != RELKIND_RELATION)
587 0 : ereport(ERROR,
588 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
589 : errmsg("logical replication target relation \"%s.%s\" is not a table",
590 : nspname, relname)));
591 0 : }
|