LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 56 164 34.1 %
Date: 2017-09-29 13:40:31 Functions: 5 11 45.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "miscadmin.h"
      18             : 
      19             : #include "access/genam.h"
      20             : #include "access/heapam.h"
      21             : #include "access/htup_details.h"
      22             : #include "access/xact.h"
      23             : 
      24             : #include "catalog/indexing.h"
      25             : #include "catalog/pg_type.h"
      26             : #include "catalog/pg_subscription.h"
      27             : #include "catalog/pg_subscription_rel.h"
      28             : 
      29             : #include "nodes/makefuncs.h"
      30             : 
      31             : #include "storage/lmgr.h"
      32             : 
      33             : #include "utils/array.h"
      34             : #include "utils/builtins.h"
      35             : #include "utils/fmgroids.h"
      36             : #include "utils/pg_lsn.h"
      37             : #include "utils/rel.h"
      38             : #include "utils/syscache.h"
      39             : 
      40             : 
      41             : static List *textarray_to_stringlist(ArrayType *textarray);
      42             : 
      43             : /*
      44             :  * Fetch the subscription from the syscache.
      45             :  */
      46             : Subscription *
      47          12 : GetSubscription(Oid subid, bool missing_ok)
      48             : {
      49             :     HeapTuple   tup;
      50             :     Subscription *sub;
      51             :     Form_pg_subscription subform;
      52             :     Datum       datum;
      53             :     bool        isnull;
      54             : 
      55          12 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      56             : 
      57          12 :     if (!HeapTupleIsValid(tup))
      58             :     {
      59           0 :         if (missing_ok)
      60           0 :             return NULL;
      61             : 
      62           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      63             :     }
      64             : 
      65          12 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      66             : 
      67          12 :     sub = (Subscription *) palloc(sizeof(Subscription));
      68          12 :     sub->oid = subid;
      69          12 :     sub->dbid = subform->subdbid;
      70          12 :     sub->name = pstrdup(NameStr(subform->subname));
      71          12 :     sub->owner = subform->subowner;
      72          12 :     sub->enabled = subform->subenabled;
      73             : 
      74             :     /* Get conninfo */
      75          12 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      76             :                             tup,
      77             :                             Anum_pg_subscription_subconninfo,
      78             :                             &isnull);
      79          12 :     Assert(!isnull);
      80          12 :     sub->conninfo = TextDatumGetCString(datum);
      81             : 
      82             :     /* Get slotname */
      83          12 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      84             :                             tup,
      85             :                             Anum_pg_subscription_subslotname,
      86             :                             &isnull);
      87          12 :     if (!isnull)
      88          10 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
      89             :     else
      90           2 :         sub->slotname = NULL;
      91             : 
      92             :     /* Get synccommit */
      93          12 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      94             :                             tup,
      95             :                             Anum_pg_subscription_subsynccommit,
      96             :                             &isnull);
      97          12 :     Assert(!isnull);
      98          12 :     sub->synccommit = TextDatumGetCString(datum);
      99             : 
     100             :     /* Get publications */
     101          12 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     102             :                             tup,
     103             :                             Anum_pg_subscription_subpublications,
     104             :                             &isnull);
     105          12 :     Assert(!isnull);
     106          12 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     107             : 
     108          12 :     ReleaseSysCache(tup);
     109             : 
     110          12 :     return sub;
     111             : }
     112             : 
     113             : /*
     114             :  * Return number of subscriptions defined in given database.
     115             :  * Used by dropdb() to check if database can indeed be dropped.
     116             :  */
     117             : int
     118           0 : CountDBSubscriptions(Oid dbid)
     119             : {
     120           0 :     int         nsubs = 0;
     121             :     Relation    rel;
     122             :     ScanKeyData scankey;
     123             :     SysScanDesc scan;
     124             :     HeapTuple   tup;
     125             : 
     126           0 :     rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
     127             : 
     128           0 :     ScanKeyInit(&scankey,
     129             :                 Anum_pg_subscription_subdbid,
     130             :                 BTEqualStrategyNumber, F_OIDEQ,
     131             :                 ObjectIdGetDatum(dbid));
     132             : 
     133           0 :     scan = systable_beginscan(rel, InvalidOid, false,
     134             :                               NULL, 1, &scankey);
     135             : 
     136           0 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     137           0 :         nsubs++;
     138             : 
     139           0 :     systable_endscan(scan);
     140             : 
     141           0 :     heap_close(rel, NoLock);
     142             : 
     143           0 :     return nsubs;
     144             : }
     145             : 
     146             : /*
     147             :  * Free memory allocated by subscription struct.
     148             :  */
     149             : void
     150           0 : FreeSubscription(Subscription *sub)
     151             : {
     152           0 :     pfree(sub->name);
     153           0 :     pfree(sub->conninfo);
     154           0 :     if (sub->slotname)
     155           0 :         pfree(sub->slotname);
     156           0 :     list_free_deep(sub->publications);
     157           0 :     pfree(sub);
     158           0 : }
     159             : 
     160             : /*
     161             :  * get_subscription_oid - given a subscription name, look up the OID
     162             :  *
     163             :  * If missing_ok is false, throw an error if name not found.  If true, just
     164             :  * return InvalidOid.
     165             :  */
     166             : Oid
     167           7 : get_subscription_oid(const char *subname, bool missing_ok)
     168             : {
     169             :     Oid         oid;
     170             : 
     171           7 :     oid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
     172             :                           CStringGetDatum(subname));
     173           7 :     if (!OidIsValid(oid) && !missing_ok)
     174           1 :         ereport(ERROR,
     175             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     176             :                  errmsg("subscription \"%s\" does not exist", subname)));
     177           6 :     return oid;
     178             : }
     179             : 
     180             : /*
     181             :  * get_subscription_name - given a subscription OID, look up the name
     182             :  */
     183             : char *
     184           7 : get_subscription_name(Oid subid)
     185             : {
     186             :     HeapTuple   tup;
     187             :     char       *subname;
     188             :     Form_pg_subscription subform;
     189             : 
     190           7 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     191             : 
     192           7 :     if (!HeapTupleIsValid(tup))
     193           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     194             : 
     195           7 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
     196           7 :     subname = pstrdup(NameStr(subform->subname));
     197             : 
     198           7 :     ReleaseSysCache(tup);
     199             : 
     200           7 :     return subname;
     201             : }
     202             : 
     203             : /*
     204             :  * Convert text array to list of strings.
     205             :  *
     206             :  * Note: the resulting list of strings is pallocated here.
     207             :  */
     208             : static List *
     209          12 : textarray_to_stringlist(ArrayType *textarray)
     210             : {
     211             :     Datum      *elems;
     212             :     int         nelems,
     213             :                 i;
     214          12 :     List       *res = NIL;
     215             : 
     216          12 :     deconstruct_array(textarray,
     217             :                       TEXTOID, -1, false, 'i',
     218             :                       &elems, NULL, &nelems);
     219             : 
     220          12 :     if (nelems == 0)
     221           0 :         return NIL;
     222             : 
     223          32 :     for (i = 0; i < nelems; i++)
     224          20 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     225             : 
     226          12 :     return res;
     227             : }
     228             : 
     229             : /*
     230             :  * Set the state of a subscription table.
     231             :  *
     232             :  * If update_only is true and the record for given table doesn't exist, do
     233             :  * nothing.  This can be used to avoid inserting a new record that was deleted
     234             :  * by someone else.  Generally, subscription DDL commands should use false,
     235             :  * workers should use true.
     236             :  *
     237             :  * The insert-or-update logic in this function is not concurrency safe so it
     238             :  * might raise an error in rare circumstances.  But if we took a stronger lock
     239             :  * such as ShareRowExclusiveLock, we would risk more deadlocks.
     240             :  */
     241             : Oid
     242           0 : SetSubscriptionRelState(Oid subid, Oid relid, char state,
     243             :                         XLogRecPtr sublsn, bool update_only)
     244             : {
     245             :     Relation    rel;
     246             :     HeapTuple   tup;
     247           0 :     Oid         subrelid = InvalidOid;
     248             :     bool        nulls[Natts_pg_subscription_rel];
     249             :     Datum       values[Natts_pg_subscription_rel];
     250             : 
     251           0 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     252             : 
     253           0 :     rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
     254             : 
     255             :     /* Try finding existing mapping. */
     256           0 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     257             :                               ObjectIdGetDatum(relid),
     258             :                               ObjectIdGetDatum(subid));
     259             : 
     260             :     /*
     261             :      * If the record for given table does not exist yet create new record,
     262             :      * otherwise update the existing one.
     263             :      */
     264           0 :     if (!HeapTupleIsValid(tup) && !update_only)
     265             :     {
     266             :         /* Form the tuple. */
     267           0 :         memset(values, 0, sizeof(values));
     268           0 :         memset(nulls, false, sizeof(nulls));
     269           0 :         values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     270           0 :         values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     271           0 :         values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     272           0 :         if (sublsn != InvalidXLogRecPtr)
     273           0 :             values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     274             :         else
     275           0 :             nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     276             : 
     277           0 :         tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     278             : 
     279             :         /* Insert tuple into catalog. */
     280           0 :         subrelid = CatalogTupleInsert(rel, tup);
     281             : 
     282           0 :         heap_freetuple(tup);
     283             :     }
     284           0 :     else if (HeapTupleIsValid(tup))
     285             :     {
     286             :         bool        replaces[Natts_pg_subscription_rel];
     287             : 
     288             :         /* Update the tuple. */
     289           0 :         memset(values, 0, sizeof(values));
     290           0 :         memset(nulls, false, sizeof(nulls));
     291           0 :         memset(replaces, false, sizeof(replaces));
     292             : 
     293           0 :         replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     294           0 :         values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     295             : 
     296           0 :         replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     297           0 :         if (sublsn != InvalidXLogRecPtr)
     298           0 :             values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     299             :         else
     300           0 :             nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     301             : 
     302           0 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     303             :                                 replaces);
     304             : 
     305             :         /* Update the catalog. */
     306           0 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
     307             : 
     308           0 :         subrelid = HeapTupleGetOid(tup);
     309             :     }
     310             : 
     311             :     /* Cleanup. */
     312           0 :     heap_close(rel, NoLock);
     313             : 
     314           0 :     return subrelid;
     315             : }
     316             : 
     317             : /*
     318             :  * Get state of subscription table.
     319             :  *
     320             :  * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
     321             :  */
     322             : char
     323           0 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
     324             :                         bool missing_ok)
     325             : {
     326             :     Relation    rel;
     327             :     HeapTuple   tup;
     328             :     char        substate;
     329             :     bool        isnull;
     330             :     Datum       d;
     331             : 
     332           0 :     rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
     333             : 
     334             :     /* Try finding the mapping. */
     335           0 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     336             :                           ObjectIdGetDatum(relid),
     337             :                           ObjectIdGetDatum(subid));
     338             : 
     339           0 :     if (!HeapTupleIsValid(tup))
     340             :     {
     341           0 :         if (missing_ok)
     342             :         {
     343           0 :             heap_close(rel, AccessShareLock);
     344           0 :             *sublsn = InvalidXLogRecPtr;
     345           0 :             return SUBREL_STATE_UNKNOWN;
     346             :         }
     347             : 
     348           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     349             :              relid, subid);
     350             :     }
     351             : 
     352             :     /* Get the state. */
     353           0 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     354             :                         Anum_pg_subscription_rel_srsubstate, &isnull);
     355           0 :     Assert(!isnull);
     356           0 :     substate = DatumGetChar(d);
     357           0 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     358             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     359           0 :     if (isnull)
     360           0 :         *sublsn = InvalidXLogRecPtr;
     361             :     else
     362           0 :         *sublsn = DatumGetLSN(d);
     363             : 
     364             :     /* Cleanup */
     365           0 :     ReleaseSysCache(tup);
     366           0 :     heap_close(rel, AccessShareLock);
     367             : 
     368           0 :     return substate;
     369             : }
     370             : 
     371             : /*
     372             :  * Drop subscription relation mapping. These can be for a particular
     373             :  * subscription, or for a particular relation, or both.
     374             :  */
     375             : void
     376        2144 : RemoveSubscriptionRel(Oid subid, Oid relid)
     377             : {
     378             :     Relation    rel;
     379             :     HeapScanDesc scan;
     380             :     ScanKeyData skey[2];
     381             :     HeapTuple   tup;
     382        2144 :     int         nkeys = 0;
     383             : 
     384        2144 :     rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
     385             : 
     386        2144 :     if (OidIsValid(subid))
     387             :     {
     388           3 :         ScanKeyInit(&skey[nkeys++],
     389             :                     Anum_pg_subscription_rel_srsubid,
     390             :                     BTEqualStrategyNumber,
     391             :                     F_OIDEQ,
     392             :                     ObjectIdGetDatum(subid));
     393             :     }
     394             : 
     395        2144 :     if (OidIsValid(relid))
     396             :     {
     397        2141 :         ScanKeyInit(&skey[nkeys++],
     398             :                     Anum_pg_subscription_rel_srrelid,
     399             :                     BTEqualStrategyNumber,
     400             :                     F_OIDEQ,
     401             :                     ObjectIdGetDatum(relid));
     402             :     }
     403             : 
     404             :     /* Do the search and delete what we found. */
     405        2144 :     scan = heap_beginscan_catalog(rel, nkeys, skey);
     406        4288 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     407             :     {
     408           0 :         CatalogTupleDelete(rel, &tup->t_self);
     409             :     }
     410        2144 :     heap_endscan(scan);
     411             : 
     412        2144 :     heap_close(rel, RowExclusiveLock);
     413        2144 : }
     414             : 
     415             : 
     416             : /*
     417             :  * Get all relations for subscription.
     418             :  *
     419             :  * Returned list is palloc'ed in current memory context.
     420             :  */
     421             : List *
     422           0 : GetSubscriptionRelations(Oid subid)
     423             : {
     424           0 :     List       *res = NIL;
     425             :     Relation    rel;
     426             :     HeapTuple   tup;
     427           0 :     int         nkeys = 0;
     428             :     ScanKeyData skey[2];
     429             :     SysScanDesc scan;
     430             : 
     431           0 :     rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
     432             : 
     433           0 :     ScanKeyInit(&skey[nkeys++],
     434             :                 Anum_pg_subscription_rel_srsubid,
     435             :                 BTEqualStrategyNumber, F_OIDEQ,
     436             :                 ObjectIdGetDatum(subid));
     437             : 
     438           0 :     scan = systable_beginscan(rel, InvalidOid, false,
     439             :                               NULL, nkeys, skey);
     440             : 
     441           0 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     442             :     {
     443             :         Form_pg_subscription_rel subrel;
     444             :         SubscriptionRelState *relstate;
     445             : 
     446           0 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     447             : 
     448           0 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     449           0 :         relstate->relid = subrel->srrelid;
     450           0 :         relstate->state = subrel->srsubstate;
     451           0 :         relstate->lsn = subrel->srsublsn;
     452             : 
     453           0 :         res = lappend(res, relstate);
     454             :     }
     455             : 
     456             :     /* Cleanup */
     457           0 :     systable_endscan(scan);
     458           0 :     heap_close(rel, AccessShareLock);
     459             : 
     460           0 :     return res;
     461             : }
     462             : 
     463             : /*
     464             :  * Get all relations for subscription that are not in a ready state.
     465             :  *
     466             :  * Returned list is palloc'ed in current memory context.
     467             :  */
     468             : List *
     469           0 : GetSubscriptionNotReadyRelations(Oid subid)
     470             : {
     471           0 :     List       *res = NIL;
     472             :     Relation    rel;
     473             :     HeapTuple   tup;
     474           0 :     int         nkeys = 0;
     475             :     ScanKeyData skey[2];
     476             :     SysScanDesc scan;
     477             : 
     478           0 :     rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
     479             : 
     480           0 :     ScanKeyInit(&skey[nkeys++],
     481             :                 Anum_pg_subscription_rel_srsubid,
     482             :                 BTEqualStrategyNumber, F_OIDEQ,
     483             :                 ObjectIdGetDatum(subid));
     484             : 
     485           0 :     ScanKeyInit(&skey[nkeys++],
     486             :                 Anum_pg_subscription_rel_srsubstate,
     487             :                 BTEqualStrategyNumber, F_CHARNE,
     488             :                 CharGetDatum(SUBREL_STATE_READY));
     489             : 
     490           0 :     scan = systable_beginscan(rel, InvalidOid, false,
     491             :                               NULL, nkeys, skey);
     492             : 
     493           0 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     494             :     {
     495             :         Form_pg_subscription_rel subrel;
     496             :         SubscriptionRelState *relstate;
     497             : 
     498           0 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     499             : 
     500           0 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     501           0 :         relstate->relid = subrel->srrelid;
     502           0 :         relstate->state = subrel->srsubstate;
     503           0 :         relstate->lsn = subrel->srsublsn;
     504             : 
     505           0 :         res = lappend(res, relstate);
     506             :     }
     507             : 
     508             :     /* Cleanup */
     509           0 :     systable_endscan(scan);
     510           0 :     heap_close(rel, AccessShareLock);
     511             : 
     512           0 :     return res;
     513             : }

Generated by: LCOV version 1.11