Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * proto.c
4 : * logical replication protocol functions
5 : *
6 : * Copyright (c) 2015-2017, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/proto.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/sysattr.h"
16 : #include "catalog/pg_namespace.h"
17 : #include "catalog/pg_type.h"
18 : #include "libpq/pqformat.h"
19 : #include "replication/logicalproto.h"
20 : #include "utils/builtins.h"
21 : #include "utils/lsyscache.h"
22 : #include "utils/syscache.h"
23 :
24 : /*
25 : * Protocol message flags.
26 : */
27 : #define LOGICALREP_IS_REPLICA_IDENTITY 1
28 :
29 : static void logicalrep_write_attrs(StringInfo out, Relation rel);
30 : static void logicalrep_write_tuple(StringInfo out, Relation rel,
31 : HeapTuple tuple);
32 :
33 : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
34 : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
35 :
36 : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
37 : static const char *logicalrep_read_namespace(StringInfo in);
38 :
39 : /*
40 : * Write BEGIN to the output stream.
41 : */
42 : void
43 0 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
44 : {
45 0 : pq_sendbyte(out, 'B'); /* BEGIN */
46 :
47 : /* fixed fields */
48 0 : pq_sendint64(out, txn->final_lsn);
49 0 : pq_sendint64(out, txn->commit_time);
50 0 : pq_sendint(out, txn->xid, 4);
51 0 : }
52 :
53 : /*
54 : * Read transaction BEGIN from the stream.
55 : */
56 : void
57 0 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
58 : {
59 : /* read fields */
60 0 : begin_data->final_lsn = pq_getmsgint64(in);
61 0 : if (begin_data->final_lsn == InvalidXLogRecPtr)
62 0 : elog(ERROR, "final_lsn not set in begin message");
63 0 : begin_data->committime = pq_getmsgint64(in);
64 0 : begin_data->xid = pq_getmsgint(in, 4);
65 0 : }
66 :
67 :
68 : /*
69 : * Write COMMIT to the output stream.
70 : */
71 : void
72 0 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
73 : XLogRecPtr commit_lsn)
74 : {
75 0 : uint8 flags = 0;
76 :
77 0 : pq_sendbyte(out, 'C'); /* sending COMMIT */
78 :
79 : /* send the flags field (unused for now) */
80 0 : pq_sendbyte(out, flags);
81 :
82 : /* send fields */
83 0 : pq_sendint64(out, commit_lsn);
84 0 : pq_sendint64(out, txn->end_lsn);
85 0 : pq_sendint64(out, txn->commit_time);
86 0 : }
87 :
88 : /*
89 : * Read transaction COMMIT from the stream.
90 : */
91 : void
92 0 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
93 : {
94 : /* read flags (unused for now) */
95 0 : uint8 flags = pq_getmsgbyte(in);
96 :
97 0 : if (flags != 0)
98 0 : elog(ERROR, "unrecognized flags %u in commit message", flags);
99 :
100 : /* read fields */
101 0 : commit_data->commit_lsn = pq_getmsgint64(in);
102 0 : commit_data->end_lsn = pq_getmsgint64(in);
103 0 : commit_data->committime = pq_getmsgint64(in);
104 0 : }
105 :
106 : /*
107 : * Write ORIGIN to the output stream.
108 : */
109 : void
110 0 : logicalrep_write_origin(StringInfo out, const char *origin,
111 : XLogRecPtr origin_lsn)
112 : {
113 0 : pq_sendbyte(out, 'O'); /* ORIGIN */
114 :
115 : /* fixed fields */
116 0 : pq_sendint64(out, origin_lsn);
117 :
118 : /* origin string */
119 0 : pq_sendstring(out, origin);
120 0 : }
121 :
122 : /*
123 : * Read ORIGIN from the output stream.
124 : */
125 : char *
126 0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
127 : {
128 : /* fixed fields */
129 0 : *origin_lsn = pq_getmsgint64(in);
130 :
131 : /* return origin */
132 0 : return pstrdup(pq_getmsgstring(in));
133 : }
134 :
135 : /*
136 : * Write INSERT to the output stream.
137 : */
138 : void
139 0 : logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
140 : {
141 0 : pq_sendbyte(out, 'I'); /* action INSERT */
142 :
143 0 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
144 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
145 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
146 :
147 : /* use Oid as relation identifier */
148 0 : pq_sendint(out, RelationGetRelid(rel), 4);
149 :
150 0 : pq_sendbyte(out, 'N'); /* new tuple follows */
151 0 : logicalrep_write_tuple(out, rel, newtuple);
152 0 : }
153 :
154 : /*
155 : * Read INSERT from stream.
156 : *
157 : * Fills the new tuple.
158 : */
159 : LogicalRepRelId
160 0 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
161 : {
162 : char action;
163 : LogicalRepRelId relid;
164 :
165 : /* read the relation id */
166 0 : relid = pq_getmsgint(in, 4);
167 :
168 0 : action = pq_getmsgbyte(in);
169 0 : if (action != 'N')
170 0 : elog(ERROR, "expected new tuple but got %d",
171 : action);
172 :
173 0 : logicalrep_read_tuple(in, newtup);
174 :
175 0 : return relid;
176 : }
177 :
178 : /*
179 : * Write UPDATE to the output stream.
180 : */
181 : void
182 0 : logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
183 : HeapTuple newtuple)
184 : {
185 0 : pq_sendbyte(out, 'U'); /* action UPDATE */
186 :
187 0 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
188 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
189 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
190 :
191 : /* use Oid as relation identifier */
192 0 : pq_sendint(out, RelationGetRelid(rel), 4);
193 :
194 0 : if (oldtuple != NULL)
195 : {
196 0 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
197 0 : pq_sendbyte(out, 'O'); /* old tuple follows */
198 : else
199 0 : pq_sendbyte(out, 'K'); /* old key follows */
200 0 : logicalrep_write_tuple(out, rel, oldtuple);
201 : }
202 :
203 0 : pq_sendbyte(out, 'N'); /* new tuple follows */
204 0 : logicalrep_write_tuple(out, rel, newtuple);
205 0 : }
206 :
207 : /*
208 : * Read UPDATE from stream.
209 : */
210 : LogicalRepRelId
211 0 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
212 : LogicalRepTupleData *oldtup,
213 : LogicalRepTupleData *newtup)
214 : {
215 : char action;
216 : LogicalRepRelId relid;
217 :
218 : /* read the relation id */
219 0 : relid = pq_getmsgint(in, 4);
220 :
221 : /* read and verify action */
222 0 : action = pq_getmsgbyte(in);
223 0 : if (action != 'K' && action != 'O' && action != 'N')
224 0 : elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
225 : action);
226 :
227 : /* check for old tuple */
228 0 : if (action == 'K' || action == 'O')
229 : {
230 0 : logicalrep_read_tuple(in, oldtup);
231 0 : *has_oldtuple = true;
232 :
233 0 : action = pq_getmsgbyte(in);
234 : }
235 : else
236 0 : *has_oldtuple = false;
237 :
238 : /* check for new tuple */
239 0 : if (action != 'N')
240 0 : elog(ERROR, "expected action 'N', got %c",
241 : action);
242 :
243 0 : logicalrep_read_tuple(in, newtup);
244 :
245 0 : return relid;
246 : }
247 :
248 : /*
249 : * Write DELETE to the output stream.
250 : */
251 : void
252 0 : logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
253 : {
254 0 : Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
255 : rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
256 : rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
257 :
258 0 : pq_sendbyte(out, 'D'); /* action DELETE */
259 :
260 : /* use Oid as relation identifier */
261 0 : pq_sendint(out, RelationGetRelid(rel), 4);
262 :
263 0 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
264 0 : pq_sendbyte(out, 'O'); /* old tuple follows */
265 : else
266 0 : pq_sendbyte(out, 'K'); /* old key follows */
267 :
268 0 : logicalrep_write_tuple(out, rel, oldtuple);
269 0 : }
270 :
271 : /*
272 : * Read DELETE from stream.
273 : *
274 : * Fills the old tuple.
275 : */
276 : LogicalRepRelId
277 0 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
278 : {
279 : char action;
280 : LogicalRepRelId relid;
281 :
282 : /* read the relation id */
283 0 : relid = pq_getmsgint(in, 4);
284 :
285 : /* read and verify action */
286 0 : action = pq_getmsgbyte(in);
287 0 : if (action != 'K' && action != 'O')
288 0 : elog(ERROR, "expected action 'O' or 'K', got %c", action);
289 :
290 0 : logicalrep_read_tuple(in, oldtup);
291 :
292 0 : return relid;
293 : }
294 :
295 : /*
296 : * Write relation description to the output stream.
297 : */
298 : void
299 0 : logicalrep_write_rel(StringInfo out, Relation rel)
300 : {
301 : char *relname;
302 :
303 0 : pq_sendbyte(out, 'R'); /* sending RELATION */
304 :
305 : /* use Oid as relation identifier */
306 0 : pq_sendint(out, RelationGetRelid(rel), 4);
307 :
308 : /* send qualified relation name */
309 0 : logicalrep_write_namespace(out, RelationGetNamespace(rel));
310 0 : relname = RelationGetRelationName(rel);
311 0 : pq_sendstring(out, relname);
312 :
313 : /* send replica identity */
314 0 : pq_sendbyte(out, rel->rd_rel->relreplident);
315 :
316 : /* send the attribute info */
317 0 : logicalrep_write_attrs(out, rel);
318 0 : }
319 :
320 : /*
321 : * Read the relation info from stream and return as LogicalRepRelation.
322 : */
323 : LogicalRepRelation *
324 0 : logicalrep_read_rel(StringInfo in)
325 : {
326 0 : LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
327 :
328 0 : rel->remoteid = pq_getmsgint(in, 4);
329 :
330 : /* Read relation name from stream */
331 0 : rel->nspname = pstrdup(logicalrep_read_namespace(in));
332 0 : rel->relname = pstrdup(pq_getmsgstring(in));
333 :
334 : /* Read the replica identity. */
335 0 : rel->replident = pq_getmsgbyte(in);
336 :
337 : /* Get attribute description */
338 0 : logicalrep_read_attrs(in, rel);
339 :
340 0 : return rel;
341 : }
342 :
343 : /*
344 : * Write type info to the output stream.
345 : *
346 : * This function will always write base type info.
347 : */
348 : void
349 0 : logicalrep_write_typ(StringInfo out, Oid typoid)
350 : {
351 0 : Oid basetypoid = getBaseType(typoid);
352 : HeapTuple tup;
353 : Form_pg_type typtup;
354 :
355 0 : pq_sendbyte(out, 'Y'); /* sending TYPE */
356 :
357 0 : tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
358 0 : if (!HeapTupleIsValid(tup))
359 0 : elog(ERROR, "cache lookup failed for type %u", basetypoid);
360 0 : typtup = (Form_pg_type) GETSTRUCT(tup);
361 :
362 : /* use Oid as relation identifier */
363 0 : pq_sendint(out, typoid, 4);
364 :
365 : /* send qualified type name */
366 0 : logicalrep_write_namespace(out, typtup->typnamespace);
367 0 : pq_sendstring(out, NameStr(typtup->typname));
368 :
369 0 : ReleaseSysCache(tup);
370 0 : }
371 :
372 : /*
373 : * Read type info from the output stream.
374 : */
375 : void
376 0 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
377 : {
378 0 : ltyp->remoteid = pq_getmsgint(in, 4);
379 :
380 : /* Read type name from stream */
381 0 : ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
382 0 : ltyp->typname = pstrdup(pq_getmsgstring(in));
383 0 : }
384 :
385 : /*
386 : * Write a tuple to the outputstream, in the most efficient format possible.
387 : */
388 : static void
389 0 : logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
390 : {
391 : TupleDesc desc;
392 : Datum values[MaxTupleAttributeNumber];
393 : bool isnull[MaxTupleAttributeNumber];
394 : int i;
395 0 : uint16 nliveatts = 0;
396 :
397 0 : desc = RelationGetDescr(rel);
398 :
399 0 : for (i = 0; i < desc->natts; i++)
400 : {
401 0 : if (TupleDescAttr(desc, i)->attisdropped)
402 0 : continue;
403 0 : nliveatts++;
404 : }
405 0 : pq_sendint(out, nliveatts, 2);
406 :
407 : /* try to allocate enough memory from the get-go */
408 0 : enlargeStringInfo(out, tuple->t_len +
409 0 : nliveatts * (1 + 4));
410 :
411 0 : heap_deform_tuple(tuple, desc, values, isnull);
412 :
413 : /* Write the values */
414 0 : for (i = 0; i < desc->natts; i++)
415 : {
416 : HeapTuple typtup;
417 : Form_pg_type typclass;
418 0 : Form_pg_attribute att = TupleDescAttr(desc, i);
419 : char *outputstr;
420 :
421 : /* skip dropped columns */
422 0 : if (att->attisdropped)
423 0 : continue;
424 :
425 0 : if (isnull[i])
426 : {
427 0 : pq_sendbyte(out, 'n'); /* null column */
428 0 : continue;
429 : }
430 0 : else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
431 : {
432 0 : pq_sendbyte(out, 'u'); /* unchanged toast column */
433 0 : continue;
434 : }
435 :
436 0 : typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
437 0 : if (!HeapTupleIsValid(typtup))
438 0 : elog(ERROR, "cache lookup failed for type %u", att->atttypid);
439 0 : typclass = (Form_pg_type) GETSTRUCT(typtup);
440 :
441 0 : pq_sendbyte(out, 't'); /* 'text' data follows */
442 :
443 0 : outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
444 0 : pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
445 0 : pfree(outputstr);
446 :
447 0 : ReleaseSysCache(typtup);
448 : }
449 0 : }
450 :
451 : /*
452 : * Read tuple in remote format from stream.
453 : *
454 : * The returned tuple points into the input stringinfo.
455 : */
456 : static void
457 0 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
458 : {
459 : int i;
460 : int natts;
461 :
462 : /* Get number of attributes */
463 0 : natts = pq_getmsgint(in, 2);
464 :
465 0 : memset(tuple->changed, 0, sizeof(tuple->changed));
466 :
467 : /* Read the data */
468 0 : for (i = 0; i < natts; i++)
469 : {
470 : char kind;
471 :
472 0 : kind = pq_getmsgbyte(in);
473 :
474 0 : switch (kind)
475 : {
476 : case 'n': /* null */
477 0 : tuple->values[i] = NULL;
478 0 : tuple->changed[i] = true;
479 0 : break;
480 : case 'u': /* unchanged column */
481 : /* we don't receive the value of an unchanged column */
482 0 : tuple->values[i] = NULL;
483 0 : break;
484 : case 't': /* text formatted value */
485 : {
486 : int len;
487 :
488 0 : tuple->changed[i] = true;
489 :
490 0 : len = pq_getmsgint(in, 4); /* read length */
491 :
492 : /* and data */
493 0 : tuple->values[i] = palloc(len + 1);
494 0 : pq_copymsgbytes(in, tuple->values[i], len);
495 0 : tuple->values[i][len] = '\0';
496 : }
497 0 : break;
498 : default:
499 0 : elog(ERROR, "unrecognized data representation type '%c'", kind);
500 : }
501 : }
502 0 : }
503 :
504 : /*
505 : * Write relation attributes to the stream.
506 : */
507 : static void
508 0 : logicalrep_write_attrs(StringInfo out, Relation rel)
509 : {
510 : TupleDesc desc;
511 : int i;
512 0 : uint16 nliveatts = 0;
513 0 : Bitmapset *idattrs = NULL;
514 : bool replidentfull;
515 :
516 0 : desc = RelationGetDescr(rel);
517 :
518 : /* send number of live attributes */
519 0 : for (i = 0; i < desc->natts; i++)
520 : {
521 0 : if (TupleDescAttr(desc, i)->attisdropped)
522 0 : continue;
523 0 : nliveatts++;
524 : }
525 0 : pq_sendint(out, nliveatts, 2);
526 :
527 : /* fetch bitmap of REPLICATION IDENTITY attributes */
528 0 : replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
529 0 : if (!replidentfull)
530 0 : idattrs = RelationGetIndexAttrBitmap(rel,
531 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
532 :
533 : /* send the attributes */
534 0 : for (i = 0; i < desc->natts; i++)
535 : {
536 0 : Form_pg_attribute att = TupleDescAttr(desc, i);
537 0 : uint8 flags = 0;
538 :
539 0 : if (att->attisdropped)
540 0 : continue;
541 :
542 : /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
543 0 : if (replidentfull ||
544 0 : bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
545 : idattrs))
546 0 : flags |= LOGICALREP_IS_REPLICA_IDENTITY;
547 :
548 0 : pq_sendbyte(out, flags);
549 :
550 : /* attribute name */
551 0 : pq_sendstring(out, NameStr(att->attname));
552 :
553 : /* attribute type id */
554 0 : pq_sendint(out, (int) att->atttypid, sizeof(att->atttypid));
555 :
556 : /* attribute mode */
557 0 : pq_sendint(out, att->atttypmod, sizeof(att->atttypmod));
558 : }
559 :
560 0 : bms_free(idattrs);
561 0 : }
562 :
563 : /*
564 : * Read relation attribute names from the stream.
565 : */
566 : static void
567 0 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
568 : {
569 : int i;
570 : int natts;
571 : char **attnames;
572 : Oid *atttyps;
573 0 : Bitmapset *attkeys = NULL;
574 :
575 0 : natts = pq_getmsgint(in, 2);
576 0 : attnames = palloc(natts * sizeof(char *));
577 0 : atttyps = palloc(natts * sizeof(Oid));
578 :
579 : /* read the attributes */
580 0 : for (i = 0; i < natts; i++)
581 : {
582 : uint8 flags;
583 :
584 : /* Check for replica identity column */
585 0 : flags = pq_getmsgbyte(in);
586 0 : if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
587 0 : attkeys = bms_add_member(attkeys, i);
588 :
589 : /* attribute name */
590 0 : attnames[i] = pstrdup(pq_getmsgstring(in));
591 :
592 : /* attribute type id */
593 0 : atttyps[i] = (Oid) pq_getmsgint(in, 4);
594 :
595 : /* we ignore attribute mode for now */
596 0 : (void) pq_getmsgint(in, 4);
597 : }
598 :
599 0 : rel->attnames = attnames;
600 0 : rel->atttyps = atttyps;
601 0 : rel->attkeys = attkeys;
602 0 : rel->natts = natts;
603 0 : }
604 :
605 : /*
606 : * Write the namespace name or empty string for pg_catalog (to save space).
607 : */
608 : static void
609 0 : logicalrep_write_namespace(StringInfo out, Oid nspid)
610 : {
611 0 : if (nspid == PG_CATALOG_NAMESPACE)
612 0 : pq_sendbyte(out, '\0');
613 : else
614 : {
615 0 : char *nspname = get_namespace_name(nspid);
616 :
617 0 : if (nspname == NULL)
618 0 : elog(ERROR, "cache lookup failed for namespace %u",
619 : nspid);
620 :
621 0 : pq_sendstring(out, nspname);
622 : }
623 0 : }
624 :
625 : /*
626 : * Read the namespace name while treating empty string as pg_catalog.
627 : */
628 : static const char *
629 0 : logicalrep_read_namespace(StringInfo in)
630 : {
631 0 : const char *nspname = pq_getmsgstring(in);
632 :
633 0 : if (nspname[0] == '\0')
634 0 : nspname = "pg_catalog";
635 :
636 0 : return nspname;
637 : }
|