LCOV - code coverage report
Current view: top level - src/backend/replication - slotfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 0 119 0.0 %
Date: 2017-09-29 15:12:54 Functions: 0 5 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slotfuncs.c
       4             :  *     Support functions for replication slots
       5             :  *
       6             :  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/slotfuncs.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : 
      14             : #include "postgres.h"
      15             : 
      16             : #include "funcapi.h"
      17             : #include "miscadmin.h"
      18             : 
      19             : #include "access/htup_details.h"
      20             : #include "replication/slot.h"
      21             : #include "replication/logical.h"
      22             : #include "replication/logicalfuncs.h"
      23             : #include "utils/builtins.h"
      24             : #include "utils/pg_lsn.h"
      25             : 
      26             : static void
      27           0 : check_permissions(void)
      28             : {
      29           0 :     if (!superuser() && !has_rolreplication(GetUserId()))
      30           0 :         ereport(ERROR,
      31             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
      32             :                  (errmsg("must be superuser or replication role to use replication slots"))));
      33           0 : }
      34             : 
      35             : /*
      36             :  * SQL function for creating a new physical (streaming replication)
      37             :  * replication slot.
      38             :  */
      39             : Datum
      40           0 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
      41             : {
      42           0 :     Name        name = PG_GETARG_NAME(0);
      43           0 :     bool        immediately_reserve = PG_GETARG_BOOL(1);
      44           0 :     bool        temporary = PG_GETARG_BOOL(2);
      45             :     Datum       values[2];
      46             :     bool        nulls[2];
      47             :     TupleDesc   tupdesc;
      48             :     HeapTuple   tuple;
      49             :     Datum       result;
      50             : 
      51           0 :     Assert(!MyReplicationSlot);
      52             : 
      53           0 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
      54           0 :         elog(ERROR, "return type must be a row type");
      55             : 
      56           0 :     check_permissions();
      57             : 
      58           0 :     CheckSlotRequirements();
      59             : 
      60             :     /* acquire replication slot, this will check for conflicting names */
      61           0 :     ReplicationSlotCreate(NameStr(*name), false,
      62             :                           temporary ? RS_TEMPORARY : RS_PERSISTENT);
      63             : 
      64           0 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
      65           0 :     nulls[0] = false;
      66             : 
      67           0 :     if (immediately_reserve)
      68             :     {
      69             :         /* Reserve WAL as the user asked for it */
      70           0 :         ReplicationSlotReserveWal();
      71             : 
      72             :         /* Write this slot to disk */
      73           0 :         ReplicationSlotMarkDirty();
      74           0 :         ReplicationSlotSave();
      75             : 
      76           0 :         values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
      77           0 :         nulls[1] = false;
      78             :     }
      79             :     else
      80             :     {
      81           0 :         nulls[1] = true;
      82             :     }
      83             : 
      84           0 :     tuple = heap_form_tuple(tupdesc, values, nulls);
      85           0 :     result = HeapTupleGetDatum(tuple);
      86             : 
      87           0 :     ReplicationSlotRelease();
      88             : 
      89           0 :     PG_RETURN_DATUM(result);
      90             : }
      91             : 
      92             : 
      93             : /*
      94             :  * SQL function for creating a new logical replication slot.
      95             :  */
      96             : Datum
      97           0 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
      98             : {
      99           0 :     Name        name = PG_GETARG_NAME(0);
     100           0 :     Name        plugin = PG_GETARG_NAME(1);
     101           0 :     bool        temporary = PG_GETARG_BOOL(2);
     102             : 
     103           0 :     LogicalDecodingContext *ctx = NULL;
     104             : 
     105             :     TupleDesc   tupdesc;
     106             :     HeapTuple   tuple;
     107             :     Datum       result;
     108             :     Datum       values[2];
     109             :     bool        nulls[2];
     110             : 
     111           0 :     Assert(!MyReplicationSlot);
     112             : 
     113           0 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     114           0 :         elog(ERROR, "return type must be a row type");
     115             : 
     116           0 :     check_permissions();
     117             : 
     118           0 :     CheckLogicalDecodingRequirements();
     119             : 
     120             :     /*
     121             :      * Acquire a logical decoding slot, this will check for conflicting names.
     122             :      * Initially create persistent slot as ephemeral - that allows us to
     123             :      * nicely handle errors during initialization because it'll get dropped if
     124             :      * this transaction fails. We'll make it persistent at the end. Temporary
     125             :      * slots can be created as temporary from beginning as they get dropped on
     126             :      * error as well.
     127             :      */
     128           0 :     ReplicationSlotCreate(NameStr(*name), true,
     129             :                           temporary ? RS_TEMPORARY : RS_EPHEMERAL);
     130             : 
     131             :     /*
     132             :      * Create logical decoding context, to build the initial snapshot.
     133             :      */
     134           0 :     ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
     135             :                                     false,  /* do not build snapshot */
     136             :                                     logical_read_local_xlog_page, NULL, NULL,
     137             :                                     NULL);
     138             : 
     139             :     /* build initial snapshot, might take a while */
     140           0 :     DecodingContextFindStartpoint(ctx);
     141             : 
     142           0 :     values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
     143           0 :     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     144             : 
     145             :     /* don't need the decoding context anymore */
     146           0 :     FreeDecodingContext(ctx);
     147             : 
     148           0 :     memset(nulls, 0, sizeof(nulls));
     149             : 
     150           0 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     151           0 :     result = HeapTupleGetDatum(tuple);
     152             : 
     153             :     /* ok, slot is now fully created, mark it as persistent if needed */
     154           0 :     if (!temporary)
     155           0 :         ReplicationSlotPersist();
     156           0 :     ReplicationSlotRelease();
     157             : 
     158           0 :     PG_RETURN_DATUM(result);
     159             : }
     160             : 
     161             : 
     162             : /*
     163             :  * SQL function for dropping a replication slot.
     164             :  */
     165             : Datum
     166           0 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
     167             : {
     168           0 :     Name        name = PG_GETARG_NAME(0);
     169             : 
     170           0 :     check_permissions();
     171             : 
     172           0 :     CheckSlotRequirements();
     173             : 
     174           0 :     ReplicationSlotDrop(NameStr(*name), true);
     175             : 
     176           0 :     PG_RETURN_VOID();
     177             : }
     178             : 
     179             : /*
     180             :  * pg_get_replication_slots - SQL SRF showing active replication slots.
     181             :  */
     182             : Datum
     183           0 : pg_get_replication_slots(PG_FUNCTION_ARGS)
     184             : {
     185             : #define PG_GET_REPLICATION_SLOTS_COLS 11
     186           0 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     187             :     TupleDesc   tupdesc;
     188             :     Tuplestorestate *tupstore;
     189             :     MemoryContext per_query_ctx;
     190             :     MemoryContext oldcontext;
     191             :     int         slotno;
     192             : 
     193             :     /* check to see if caller supports us returning a tuplestore */
     194           0 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     195           0 :         ereport(ERROR,
     196             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     197             :                  errmsg("set-valued function called in context that cannot accept a set")));
     198           0 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     199           0 :         ereport(ERROR,
     200             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     201             :                  errmsg("materialize mode required, but it is not " \
     202             :                         "allowed in this context")));
     203             : 
     204             :     /* Build a tuple descriptor for our result type */
     205           0 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     206           0 :         elog(ERROR, "return type must be a row type");
     207             : 
     208             :     /*
     209             :      * We don't require any special permission to see this function's data
     210             :      * because nothing should be sensitive. The most critical being the slot
     211             :      * name, which shouldn't contain anything particularly sensitive.
     212             :      */
     213             : 
     214           0 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     215           0 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     216             : 
     217           0 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
     218           0 :     rsinfo->returnMode = SFRM_Materialize;
     219           0 :     rsinfo->setResult = tupstore;
     220           0 :     rsinfo->setDesc = tupdesc;
     221             : 
     222           0 :     MemoryContextSwitchTo(oldcontext);
     223             : 
     224           0 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     225           0 :     for (slotno = 0; slotno < max_replication_slots; slotno++)
     226             :     {
     227           0 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
     228             :         Datum       values[PG_GET_REPLICATION_SLOTS_COLS];
     229             :         bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
     230             : 
     231             :         ReplicationSlotPersistency persistency;
     232             :         TransactionId xmin;
     233             :         TransactionId catalog_xmin;
     234             :         XLogRecPtr  restart_lsn;
     235             :         XLogRecPtr  confirmed_flush_lsn;
     236             :         pid_t       active_pid;
     237             :         Oid         database;
     238             :         NameData    slot_name;
     239             :         NameData    plugin;
     240             :         int         i;
     241             : 
     242           0 :         if (!slot->in_use)
     243           0 :             continue;
     244             : 
     245           0 :         SpinLockAcquire(&slot->mutex);
     246             : 
     247           0 :         xmin = slot->data.xmin;
     248           0 :         catalog_xmin = slot->data.catalog_xmin;
     249           0 :         database = slot->data.database;
     250           0 :         restart_lsn = slot->data.restart_lsn;
     251           0 :         confirmed_flush_lsn = slot->data.confirmed_flush;
     252           0 :         namecpy(&slot_name, &slot->data.name);
     253           0 :         namecpy(&plugin, &slot->data.plugin);
     254           0 :         active_pid = slot->active_pid;
     255           0 :         persistency = slot->data.persistency;
     256             : 
     257           0 :         SpinLockRelease(&slot->mutex);
     258             : 
     259           0 :         memset(nulls, 0, sizeof(nulls));
     260             : 
     261           0 :         i = 0;
     262           0 :         values[i++] = NameGetDatum(&slot_name);
     263             : 
     264           0 :         if (database == InvalidOid)
     265           0 :             nulls[i++] = true;
     266             :         else
     267           0 :             values[i++] = NameGetDatum(&plugin);
     268             : 
     269           0 :         if (database == InvalidOid)
     270           0 :             values[i++] = CStringGetTextDatum("physical");
     271             :         else
     272           0 :             values[i++] = CStringGetTextDatum("logical");
     273             : 
     274           0 :         if (database == InvalidOid)
     275           0 :             nulls[i++] = true;
     276             :         else
     277           0 :             values[i++] = database;
     278             : 
     279           0 :         values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
     280           0 :         values[i++] = BoolGetDatum(active_pid != 0);
     281             : 
     282           0 :         if (active_pid != 0)
     283           0 :             values[i++] = Int32GetDatum(active_pid);
     284             :         else
     285           0 :             nulls[i++] = true;
     286             : 
     287           0 :         if (xmin != InvalidTransactionId)
     288           0 :             values[i++] = TransactionIdGetDatum(xmin);
     289             :         else
     290           0 :             nulls[i++] = true;
     291             : 
     292           0 :         if (catalog_xmin != InvalidTransactionId)
     293           0 :             values[i++] = TransactionIdGetDatum(catalog_xmin);
     294             :         else
     295           0 :             nulls[i++] = true;
     296             : 
     297           0 :         if (restart_lsn != InvalidXLogRecPtr)
     298           0 :             values[i++] = LSNGetDatum(restart_lsn);
     299             :         else
     300           0 :             nulls[i++] = true;
     301             : 
     302           0 :         if (confirmed_flush_lsn != InvalidXLogRecPtr)
     303           0 :             values[i++] = LSNGetDatum(confirmed_flush_lsn);
     304             :         else
     305           0 :             nulls[i++] = true;
     306             : 
     307           0 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
     308             :     }
     309           0 :     LWLockRelease(ReplicationSlotControlLock);
     310             : 
     311             :     tuplestore_donestoring(tupstore);
     312             : 
     313           0 :     return (Datum) 0;
     314             : }

Generated by: LCOV version 1.11