LCOV - code coverage report
Current view: top level - src/backend/replication/logical - proto.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 0 233 0.0 %
Date: 2017-09-29 13:40:31 Functions: 0 22 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * proto.c
       4             :  *      logical replication protocol functions
       5             :  *
       6             :  * Copyright (c) 2015-2017, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *      src/backend/replication/logical/proto.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/sysattr.h"
      16             : #include "catalog/pg_namespace.h"
      17             : #include "catalog/pg_type.h"
      18             : #include "libpq/pqformat.h"
      19             : #include "replication/logicalproto.h"
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/syscache.h"
      23             : 
      24             : /*
      25             :  * Protocol message flags.
      26             :  */
      27             : #define LOGICALREP_IS_REPLICA_IDENTITY 1
      28             : 
      29             : static void logicalrep_write_attrs(StringInfo out, Relation rel);
      30             : static void logicalrep_write_tuple(StringInfo out, Relation rel,
      31             :                        HeapTuple tuple);
      32             : 
      33             : static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
      34             : static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
      35             : 
      36             : static void logicalrep_write_namespace(StringInfo out, Oid nspid);
      37             : static const char *logicalrep_read_namespace(StringInfo in);
      38             : 
      39             : /*
      40             :  * Write BEGIN to the output stream.
      41             :  */
      42             : void
      43           0 : logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
      44             : {
      45           0 :     pq_sendbyte(out, 'B');      /* BEGIN */
      46             : 
      47             :     /* fixed fields */
      48           0 :     pq_sendint64(out, txn->final_lsn);
      49           0 :     pq_sendint64(out, txn->commit_time);
      50           0 :     pq_sendint(out, txn->xid, 4);
      51           0 : }
      52             : 
      53             : /*
      54             :  * Read transaction BEGIN from the stream.
      55             :  */
      56             : void
      57           0 : logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
      58             : {
      59             :     /* read fields */
      60           0 :     begin_data->final_lsn = pq_getmsgint64(in);
      61           0 :     if (begin_data->final_lsn == InvalidXLogRecPtr)
      62           0 :         elog(ERROR, "final_lsn not set in begin message");
      63           0 :     begin_data->committime = pq_getmsgint64(in);
      64           0 :     begin_data->xid = pq_getmsgint(in, 4);
      65           0 : }
      66             : 
      67             : 
      68             : /*
      69             :  * Write COMMIT to the output stream.
      70             :  */
      71             : void
      72           0 : logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
      73             :                         XLogRecPtr commit_lsn)
      74             : {
      75           0 :     uint8       flags = 0;
      76             : 
      77           0 :     pq_sendbyte(out, 'C');      /* sending COMMIT */
      78             : 
      79             :     /* send the flags field (unused for now) */
      80           0 :     pq_sendbyte(out, flags);
      81             : 
      82             :     /* send fields */
      83           0 :     pq_sendint64(out, commit_lsn);
      84           0 :     pq_sendint64(out, txn->end_lsn);
      85           0 :     pq_sendint64(out, txn->commit_time);
      86           0 : }
      87             : 
      88             : /*
      89             :  * Read transaction COMMIT from the stream.
      90             :  */
      91             : void
      92           0 : logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
      93             : {
      94             :     /* read flags (unused for now) */
      95           0 :     uint8       flags = pq_getmsgbyte(in);
      96             : 
      97           0 :     if (flags != 0)
      98           0 :         elog(ERROR, "unrecognized flags %u in commit message", flags);
      99             : 
     100             :     /* read fields */
     101           0 :     commit_data->commit_lsn = pq_getmsgint64(in);
     102           0 :     commit_data->end_lsn = pq_getmsgint64(in);
     103           0 :     commit_data->committime = pq_getmsgint64(in);
     104           0 : }
     105             : 
     106             : /*
     107             :  * Write ORIGIN to the output stream.
     108             :  */
     109             : void
     110           0 : logicalrep_write_origin(StringInfo out, const char *origin,
     111             :                         XLogRecPtr origin_lsn)
     112             : {
     113           0 :     pq_sendbyte(out, 'O');      /* ORIGIN */
     114             : 
     115             :     /* fixed fields */
     116           0 :     pq_sendint64(out, origin_lsn);
     117             : 
     118             :     /* origin string */
     119           0 :     pq_sendstring(out, origin);
     120           0 : }
     121             : 
     122             : /*
     123             :  * Read ORIGIN from the output stream.
     124             :  */
     125             : char *
     126           0 : logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
     127             : {
     128             :     /* fixed fields */
     129           0 :     *origin_lsn = pq_getmsgint64(in);
     130             : 
     131             :     /* return origin */
     132           0 :     return pstrdup(pq_getmsgstring(in));
     133             : }
     134             : 
     135             : /*
     136             :  * Write INSERT to the output stream.
     137             :  */
     138             : void
     139           0 : logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
     140             : {
     141           0 :     pq_sendbyte(out, 'I');      /* action INSERT */
     142             : 
     143           0 :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     144             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     145             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     146             : 
     147             :     /* use Oid as relation identifier */
     148           0 :     pq_sendint(out, RelationGetRelid(rel), 4);
     149             : 
     150           0 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     151           0 :     logicalrep_write_tuple(out, rel, newtuple);
     152           0 : }
     153             : 
     154             : /*
     155             :  * Read INSERT from stream.
     156             :  *
     157             :  * Fills the new tuple.
     158             :  */
     159             : LogicalRepRelId
     160           0 : logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
     161             : {
     162             :     char        action;
     163             :     LogicalRepRelId relid;
     164             : 
     165             :     /* read the relation id */
     166           0 :     relid = pq_getmsgint(in, 4);
     167             : 
     168           0 :     action = pq_getmsgbyte(in);
     169           0 :     if (action != 'N')
     170           0 :         elog(ERROR, "expected new tuple but got %d",
     171             :              action);
     172             : 
     173           0 :     logicalrep_read_tuple(in, newtup);
     174             : 
     175           0 :     return relid;
     176             : }
     177             : 
     178             : /*
     179             :  * Write UPDATE to the output stream.
     180             :  */
     181             : void
     182           0 : logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
     183             :                         HeapTuple newtuple)
     184             : {
     185           0 :     pq_sendbyte(out, 'U');      /* action UPDATE */
     186             : 
     187           0 :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     188             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     189             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     190             : 
     191             :     /* use Oid as relation identifier */
     192           0 :     pq_sendint(out, RelationGetRelid(rel), 4);
     193             : 
     194           0 :     if (oldtuple != NULL)
     195             :     {
     196           0 :         if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     197           0 :             pq_sendbyte(out, 'O');  /* old tuple follows */
     198             :         else
     199           0 :             pq_sendbyte(out, 'K');  /* old key follows */
     200           0 :         logicalrep_write_tuple(out, rel, oldtuple);
     201             :     }
     202             : 
     203           0 :     pq_sendbyte(out, 'N');      /* new tuple follows */
     204           0 :     logicalrep_write_tuple(out, rel, newtuple);
     205           0 : }
     206             : 
     207             : /*
     208             :  * Read UPDATE from stream.
     209             :  */
     210             : LogicalRepRelId
     211           0 : logicalrep_read_update(StringInfo in, bool *has_oldtuple,
     212             :                        LogicalRepTupleData *oldtup,
     213             :                        LogicalRepTupleData *newtup)
     214             : {
     215             :     char        action;
     216             :     LogicalRepRelId relid;
     217             : 
     218             :     /* read the relation id */
     219           0 :     relid = pq_getmsgint(in, 4);
     220             : 
     221             :     /* read and verify action */
     222           0 :     action = pq_getmsgbyte(in);
     223           0 :     if (action != 'K' && action != 'O' && action != 'N')
     224           0 :         elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
     225             :              action);
     226             : 
     227             :     /* check for old tuple */
     228           0 :     if (action == 'K' || action == 'O')
     229             :     {
     230           0 :         logicalrep_read_tuple(in, oldtup);
     231           0 :         *has_oldtuple = true;
     232             : 
     233           0 :         action = pq_getmsgbyte(in);
     234             :     }
     235             :     else
     236           0 :         *has_oldtuple = false;
     237             : 
     238             :     /* check for new  tuple */
     239           0 :     if (action != 'N')
     240           0 :         elog(ERROR, "expected action 'N', got %c",
     241             :              action);
     242             : 
     243           0 :     logicalrep_read_tuple(in, newtup);
     244             : 
     245           0 :     return relid;
     246             : }
     247             : 
     248             : /*
     249             :  * Write DELETE to the output stream.
     250             :  */
     251             : void
     252           0 : logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
     253             : {
     254           0 :     Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
     255             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     256             :            rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
     257             : 
     258           0 :     pq_sendbyte(out, 'D');      /* action DELETE */
     259             : 
     260             :     /* use Oid as relation identifier */
     261           0 :     pq_sendint(out, RelationGetRelid(rel), 4);
     262             : 
     263           0 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     264           0 :         pq_sendbyte(out, 'O');  /* old tuple follows */
     265             :     else
     266           0 :         pq_sendbyte(out, 'K');  /* old key follows */
     267             : 
     268           0 :     logicalrep_write_tuple(out, rel, oldtuple);
     269           0 : }
     270             : 
     271             : /*
     272             :  * Read DELETE from stream.
     273             :  *
     274             :  * Fills the old tuple.
     275             :  */
     276             : LogicalRepRelId
     277           0 : logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
     278             : {
     279             :     char        action;
     280             :     LogicalRepRelId relid;
     281             : 
     282             :     /* read the relation id */
     283           0 :     relid = pq_getmsgint(in, 4);
     284             : 
     285             :     /* read and verify action */
     286           0 :     action = pq_getmsgbyte(in);
     287           0 :     if (action != 'K' && action != 'O')
     288           0 :         elog(ERROR, "expected action 'O' or 'K', got %c", action);
     289             : 
     290           0 :     logicalrep_read_tuple(in, oldtup);
     291             : 
     292           0 :     return relid;
     293             : }
     294             : 
     295             : /*
     296             :  * Write relation description to the output stream.
     297             :  */
     298             : void
     299           0 : logicalrep_write_rel(StringInfo out, Relation rel)
     300             : {
     301             :     char       *relname;
     302             : 
     303           0 :     pq_sendbyte(out, 'R');      /* sending RELATION */
     304             : 
     305             :     /* use Oid as relation identifier */
     306           0 :     pq_sendint(out, RelationGetRelid(rel), 4);
     307             : 
     308             :     /* send qualified relation name */
     309           0 :     logicalrep_write_namespace(out, RelationGetNamespace(rel));
     310           0 :     relname = RelationGetRelationName(rel);
     311           0 :     pq_sendstring(out, relname);
     312             : 
     313             :     /* send replica identity */
     314           0 :     pq_sendbyte(out, rel->rd_rel->relreplident);
     315             : 
     316             :     /* send the attribute info */
     317           0 :     logicalrep_write_attrs(out, rel);
     318           0 : }
     319             : 
     320             : /*
     321             :  * Read the relation info from stream and return as LogicalRepRelation.
     322             :  */
     323             : LogicalRepRelation *
     324           0 : logicalrep_read_rel(StringInfo in)
     325             : {
     326           0 :     LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
     327             : 
     328           0 :     rel->remoteid = pq_getmsgint(in, 4);
     329             : 
     330             :     /* Read relation name from stream */
     331           0 :     rel->nspname = pstrdup(logicalrep_read_namespace(in));
     332           0 :     rel->relname = pstrdup(pq_getmsgstring(in));
     333             : 
     334             :     /* Read the replica identity. */
     335           0 :     rel->replident = pq_getmsgbyte(in);
     336             : 
     337             :     /* Get attribute description */
     338           0 :     logicalrep_read_attrs(in, rel);
     339             : 
     340           0 :     return rel;
     341             : }
     342             : 
     343             : /*
     344             :  * Write type info to the output stream.
     345             :  *
     346             :  * This function will always write base type info.
     347             :  */
     348             : void
     349           0 : logicalrep_write_typ(StringInfo out, Oid typoid)
     350             : {
     351           0 :     Oid         basetypoid = getBaseType(typoid);
     352             :     HeapTuple   tup;
     353             :     Form_pg_type typtup;
     354             : 
     355           0 :     pq_sendbyte(out, 'Y');      /* sending TYPE */
     356             : 
     357           0 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
     358           0 :     if (!HeapTupleIsValid(tup))
     359           0 :         elog(ERROR, "cache lookup failed for type %u", basetypoid);
     360           0 :     typtup = (Form_pg_type) GETSTRUCT(tup);
     361             : 
     362             :     /* use Oid as relation identifier */
     363           0 :     pq_sendint(out, typoid, 4);
     364             : 
     365             :     /* send qualified type name */
     366           0 :     logicalrep_write_namespace(out, typtup->typnamespace);
     367           0 :     pq_sendstring(out, NameStr(typtup->typname));
     368             : 
     369           0 :     ReleaseSysCache(tup);
     370           0 : }
     371             : 
     372             : /*
     373             :  * Read type info from the output stream.
     374             :  */
     375             : void
     376           0 : logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
     377             : {
     378           0 :     ltyp->remoteid = pq_getmsgint(in, 4);
     379             : 
     380             :     /* Read type name from stream */
     381           0 :     ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
     382           0 :     ltyp->typname = pstrdup(pq_getmsgstring(in));
     383           0 : }
     384             : 
     385             : /*
     386             :  * Write a tuple to the outputstream, in the most efficient format possible.
     387             :  */
     388             : static void
     389           0 : logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
     390             : {
     391             :     TupleDesc   desc;
     392             :     Datum       values[MaxTupleAttributeNumber];
     393             :     bool        isnull[MaxTupleAttributeNumber];
     394             :     int         i;
     395           0 :     uint16      nliveatts = 0;
     396             : 
     397           0 :     desc = RelationGetDescr(rel);
     398             : 
     399           0 :     for (i = 0; i < desc->natts; i++)
     400             :     {
     401           0 :         if (TupleDescAttr(desc, i)->attisdropped)
     402           0 :             continue;
     403           0 :         nliveatts++;
     404             :     }
     405           0 :     pq_sendint(out, nliveatts, 2);
     406             : 
     407             :     /* try to allocate enough memory from the get-go */
     408           0 :     enlargeStringInfo(out, tuple->t_len +
     409           0 :                       nliveatts * (1 + 4));
     410             : 
     411           0 :     heap_deform_tuple(tuple, desc, values, isnull);
     412             : 
     413             :     /* Write the values */
     414           0 :     for (i = 0; i < desc->natts; i++)
     415             :     {
     416             :         HeapTuple   typtup;
     417             :         Form_pg_type typclass;
     418           0 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     419             :         char       *outputstr;
     420             : 
     421             :         /* skip dropped columns */
     422           0 :         if (att->attisdropped)
     423           0 :             continue;
     424             : 
     425           0 :         if (isnull[i])
     426             :         {
     427           0 :             pq_sendbyte(out, 'n');  /* null column */
     428           0 :             continue;
     429             :         }
     430           0 :         else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
     431             :         {
     432           0 :             pq_sendbyte(out, 'u');  /* unchanged toast column */
     433           0 :             continue;
     434             :         }
     435             : 
     436           0 :         typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
     437           0 :         if (!HeapTupleIsValid(typtup))
     438           0 :             elog(ERROR, "cache lookup failed for type %u", att->atttypid);
     439           0 :         typclass = (Form_pg_type) GETSTRUCT(typtup);
     440             : 
     441           0 :         pq_sendbyte(out, 't');  /* 'text' data follows */
     442             : 
     443           0 :         outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
     444           0 :         pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
     445           0 :         pfree(outputstr);
     446             : 
     447           0 :         ReleaseSysCache(typtup);
     448             :     }
     449           0 : }
     450             : 
     451             : /*
     452             :  * Read tuple in remote format from stream.
     453             :  *
     454             :  * The returned tuple points into the input stringinfo.
     455             :  */
     456             : static void
     457           0 : logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
     458             : {
     459             :     int         i;
     460             :     int         natts;
     461             : 
     462             :     /* Get number of attributes */
     463           0 :     natts = pq_getmsgint(in, 2);
     464             : 
     465           0 :     memset(tuple->changed, 0, sizeof(tuple->changed));
     466             : 
     467             :     /* Read the data */
     468           0 :     for (i = 0; i < natts; i++)
     469             :     {
     470             :         char        kind;
     471             : 
     472           0 :         kind = pq_getmsgbyte(in);
     473             : 
     474           0 :         switch (kind)
     475             :         {
     476             :             case 'n':           /* null */
     477           0 :                 tuple->values[i] = NULL;
     478           0 :                 tuple->changed[i] = true;
     479           0 :                 break;
     480             :             case 'u':           /* unchanged column */
     481             :                 /* we don't receive the value of an unchanged column */
     482           0 :                 tuple->values[i] = NULL;
     483           0 :                 break;
     484             :             case 't':           /* text formatted value */
     485             :                 {
     486             :                     int         len;
     487             : 
     488           0 :                     tuple->changed[i] = true;
     489             : 
     490           0 :                     len = pq_getmsgint(in, 4);  /* read length */
     491             : 
     492             :                     /* and data */
     493           0 :                     tuple->values[i] = palloc(len + 1);
     494           0 :                     pq_copymsgbytes(in, tuple->values[i], len);
     495           0 :                     tuple->values[i][len] = '\0';
     496             :                 }
     497           0 :                 break;
     498             :             default:
     499           0 :                 elog(ERROR, "unrecognized data representation type '%c'", kind);
     500             :         }
     501             :     }
     502           0 : }
     503             : 
     504             : /*
     505             :  * Write relation attributes to the stream.
     506             :  */
     507             : static void
     508           0 : logicalrep_write_attrs(StringInfo out, Relation rel)
     509             : {
     510             :     TupleDesc   desc;
     511             :     int         i;
     512           0 :     uint16      nliveatts = 0;
     513           0 :     Bitmapset  *idattrs = NULL;
     514             :     bool        replidentfull;
     515             : 
     516           0 :     desc = RelationGetDescr(rel);
     517             : 
     518             :     /* send number of live attributes */
     519           0 :     for (i = 0; i < desc->natts; i++)
     520             :     {
     521           0 :         if (TupleDescAttr(desc, i)->attisdropped)
     522           0 :             continue;
     523           0 :         nliveatts++;
     524             :     }
     525           0 :     pq_sendint(out, nliveatts, 2);
     526             : 
     527             :     /* fetch bitmap of REPLICATION IDENTITY attributes */
     528           0 :     replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
     529           0 :     if (!replidentfull)
     530           0 :         idattrs = RelationGetIndexAttrBitmap(rel,
     531             :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     532             : 
     533             :     /* send the attributes */
     534           0 :     for (i = 0; i < desc->natts; i++)
     535             :     {
     536           0 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     537           0 :         uint8       flags = 0;
     538             : 
     539           0 :         if (att->attisdropped)
     540           0 :             continue;
     541             : 
     542             :         /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
     543           0 :         if (replidentfull ||
     544           0 :             bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     545             :                           idattrs))
     546           0 :             flags |= LOGICALREP_IS_REPLICA_IDENTITY;
     547             : 
     548           0 :         pq_sendbyte(out, flags);
     549             : 
     550             :         /* attribute name */
     551           0 :         pq_sendstring(out, NameStr(att->attname));
     552             : 
     553             :         /* attribute type id */
     554           0 :         pq_sendint(out, (int) att->atttypid, sizeof(att->atttypid));
     555             : 
     556             :         /* attribute mode */
     557           0 :         pq_sendint(out, att->atttypmod, sizeof(att->atttypmod));
     558             :     }
     559             : 
     560           0 :     bms_free(idattrs);
     561           0 : }
     562             : 
     563             : /*
     564             :  * Read relation attribute names from the stream.
     565             :  */
     566             : static void
     567           0 : logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
     568             : {
     569             :     int         i;
     570             :     int         natts;
     571             :     char      **attnames;
     572             :     Oid        *atttyps;
     573           0 :     Bitmapset  *attkeys = NULL;
     574             : 
     575           0 :     natts = pq_getmsgint(in, 2);
     576           0 :     attnames = palloc(natts * sizeof(char *));
     577           0 :     atttyps = palloc(natts * sizeof(Oid));
     578             : 
     579             :     /* read the attributes */
     580           0 :     for (i = 0; i < natts; i++)
     581             :     {
     582             :         uint8       flags;
     583             : 
     584             :         /* Check for replica identity column */
     585           0 :         flags = pq_getmsgbyte(in);
     586           0 :         if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
     587           0 :             attkeys = bms_add_member(attkeys, i);
     588             : 
     589             :         /* attribute name */
     590           0 :         attnames[i] = pstrdup(pq_getmsgstring(in));
     591             : 
     592             :         /* attribute type id */
     593           0 :         atttyps[i] = (Oid) pq_getmsgint(in, 4);
     594             : 
     595             :         /* we ignore attribute mode for now */
     596           0 :         (void) pq_getmsgint(in, 4);
     597             :     }
     598             : 
     599           0 :     rel->attnames = attnames;
     600           0 :     rel->atttyps = atttyps;
     601           0 :     rel->attkeys = attkeys;
     602           0 :     rel->natts = natts;
     603           0 : }
     604             : 
     605             : /*
     606             :  * Write the namespace name or empty string for pg_catalog (to save space).
     607             :  */
     608             : static void
     609           0 : logicalrep_write_namespace(StringInfo out, Oid nspid)
     610             : {
     611           0 :     if (nspid == PG_CATALOG_NAMESPACE)
     612           0 :         pq_sendbyte(out, '\0');
     613             :     else
     614             :     {
     615           0 :         char       *nspname = get_namespace_name(nspid);
     616             : 
     617           0 :         if (nspname == NULL)
     618           0 :             elog(ERROR, "cache lookup failed for namespace %u",
     619             :                  nspid);
     620             : 
     621           0 :         pq_sendstring(out, nspname);
     622             :     }
     623           0 : }
     624             : 
     625             : /*
     626             :  * Read the namespace name while treating empty string as pg_catalog.
     627             :  */
     628             : static const char *
     629           0 : logicalrep_read_namespace(StringInfo in)
     630             : {
     631           0 :     const char *nspname = pq_getmsgstring(in);
     632             : 
     633           0 :     if (nspname[0] == '\0')
     634           0 :         nspname = "pg_catalog";
     635             : 
     636           0 :     return nspname;
     637             : }

Generated by: LCOV version 1.11