LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logicalfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 0 126 0.0 %
Date: 2017-09-29 13:40:31 Functions: 0 11 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * logicalfuncs.c
       4             :  *
       5             :  *     Support functions for using logical decoding and management of
       6             :  *     logical replication slots via SQL.
       7             :  *
       8             :  *
       9             :  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
      10             :  *
      11             :  * IDENTIFICATION
      12             :  *    src/backend/replication/logicalfuncs.c
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include <unistd.h>
      19             : 
      20             : #include "fmgr.h"
      21             : #include "funcapi.h"
      22             : #include "miscadmin.h"
      23             : 
      24             : #include "access/xlog_internal.h"
      25             : #include "access/xlogutils.h"
      26             : 
      27             : #include "access/xact.h"
      28             : 
      29             : #include "catalog/pg_type.h"
      30             : 
      31             : #include "nodes/makefuncs.h"
      32             : 
      33             : #include "mb/pg_wchar.h"
      34             : 
      35             : #include "utils/array.h"
      36             : #include "utils/builtins.h"
      37             : #include "utils/inval.h"
      38             : #include "utils/memutils.h"
      39             : #include "utils/pg_lsn.h"
      40             : #include "utils/regproc.h"
      41             : #include "utils/resowner.h"
      42             : #include "utils/lsyscache.h"
      43             : 
      44             : #include "replication/decode.h"
      45             : #include "replication/logical.h"
      46             : #include "replication/logicalfuncs.h"
      47             : #include "replication/message.h"
      48             : 
      49             : #include "storage/fd.h"
      50             : 
      51             : /* private date for writing out data */
      52             : typedef struct DecodingOutputState
      53             : {
      54             :     Tuplestorestate *tupstore;
      55             :     TupleDesc   tupdesc;
      56             :     bool        binary_output;
      57             :     int64       returned_rows;
      58             : } DecodingOutputState;
      59             : 
      60             : /*
      61             :  * Prepare for an output plugin write.
      62             :  */
      63             : static void
      64           0 : LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      65             :                           bool last_write)
      66             : {
      67           0 :     resetStringInfo(ctx->out);
      68           0 : }
      69             : 
      70             : /*
      71             :  * Perform output plugin write into tuplestore.
      72             :  */
      73             : static void
      74           0 : LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
      75             :                    bool last_write)
      76             : {
      77             :     Datum       values[3];
      78             :     bool        nulls[3];
      79             :     DecodingOutputState *p;
      80             : 
      81             :     /* SQL Datums can only be of a limited length... */
      82           0 :     if (ctx->out->len > MaxAllocSize - VARHDRSZ)
      83           0 :         elog(ERROR, "too much output for sql interface");
      84             : 
      85           0 :     p = (DecodingOutputState *) ctx->output_writer_private;
      86             : 
      87           0 :     memset(nulls, 0, sizeof(nulls));
      88           0 :     values[0] = LSNGetDatum(lsn);
      89           0 :     values[1] = TransactionIdGetDatum(xid);
      90             : 
      91             :     /*
      92             :      * Assert ctx->out is in database encoding when we're writing textual
      93             :      * output.
      94             :      */
      95           0 :     if (!p->binary_output)
      96           0 :         Assert(pg_verify_mbstr(GetDatabaseEncoding(),
      97             :                                ctx->out->data, ctx->out->len,
      98             :                                false));
      99             : 
     100             :     /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
     101           0 :     values[2] = PointerGetDatum(
     102             :                                 cstring_to_text_with_len(ctx->out->data, ctx->out->len));
     103             : 
     104           0 :     tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
     105           0 :     p->returned_rows++;
     106           0 : }
     107             : 
     108             : static void
     109           0 : check_permissions(void)
     110             : {
     111           0 :     if (!superuser() && !has_rolreplication(GetUserId()))
     112           0 :         ereport(ERROR,
     113             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     114             :                  (errmsg("must be superuser or replication role to use replication slots"))));
     115           0 : }
     116             : 
     117             : int
     118           0 : logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
     119             :                              int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
     120             : {
     121           0 :     return read_local_xlog_page(state, targetPagePtr, reqLen,
     122             :                                 targetRecPtr, cur_page, pageTLI);
     123             : }
     124             : 
     125             : /*
     126             :  * Helper function for the various SQL callable logical decoding functions.
     127             :  */
     128             : static Datum
     129           0 : pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
     130             : {
     131             :     Name        name;
     132             :     XLogRecPtr  upto_lsn;
     133             :     int32       upto_nchanges;
     134           0 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     135             :     MemoryContext per_query_ctx;
     136             :     MemoryContext oldcontext;
     137             :     XLogRecPtr  end_of_wal;
     138             :     XLogRecPtr  startptr;
     139             :     LogicalDecodingContext *ctx;
     140           0 :     ResourceOwner old_resowner = CurrentResourceOwner;
     141             :     ArrayType  *arr;
     142             :     Size        ndim;
     143           0 :     List       *options = NIL;
     144             :     DecodingOutputState *p;
     145             : 
     146           0 :     check_permissions();
     147             : 
     148           0 :     CheckLogicalDecodingRequirements();
     149             : 
     150           0 :     if (PG_ARGISNULL(0))
     151           0 :         ereport(ERROR,
     152             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     153             :                  errmsg("slot name must not be null")));
     154           0 :     name = PG_GETARG_NAME(0);
     155             : 
     156           0 :     if (PG_ARGISNULL(1))
     157           0 :         upto_lsn = InvalidXLogRecPtr;
     158             :     else
     159           0 :         upto_lsn = PG_GETARG_LSN(1);
     160             : 
     161           0 :     if (PG_ARGISNULL(2))
     162           0 :         upto_nchanges = InvalidXLogRecPtr;
     163             :     else
     164           0 :         upto_nchanges = PG_GETARG_INT32(2);
     165             : 
     166           0 :     if (PG_ARGISNULL(3))
     167           0 :         ereport(ERROR,
     168             :                 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
     169             :                  errmsg("options array must not be null")));
     170           0 :     arr = PG_GETARG_ARRAYTYPE_P(3);
     171             : 
     172             :     /* check to see if caller supports us returning a tuplestore */
     173           0 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     174           0 :         ereport(ERROR,
     175             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     176             :                  errmsg("set-valued function called in context that cannot accept a set")));
     177           0 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     178           0 :         ereport(ERROR,
     179             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     180             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     181             : 
     182             :     /* state to write output to */
     183           0 :     p = palloc0(sizeof(DecodingOutputState));
     184             : 
     185           0 :     p->binary_output = binary;
     186             : 
     187             :     /* Build a tuple descriptor for our result type */
     188           0 :     if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
     189           0 :         elog(ERROR, "return type must be a row type");
     190             : 
     191           0 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     192           0 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     193             : 
     194             :     /* Deconstruct options array */
     195           0 :     ndim = ARR_NDIM(arr);
     196           0 :     if (ndim > 1)
     197             :     {
     198           0 :         ereport(ERROR,
     199             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     200             :                  errmsg("array must be one-dimensional")));
     201             :     }
     202           0 :     else if (array_contains_nulls(arr))
     203             :     {
     204           0 :         ereport(ERROR,
     205             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     206             :                  errmsg("array must not contain nulls")));
     207             :     }
     208           0 :     else if (ndim == 1)
     209             :     {
     210             :         int         nelems;
     211             :         Datum      *datum_opts;
     212             :         int         i;
     213             : 
     214           0 :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     215             : 
     216           0 :         deconstruct_array(arr, TEXTOID, -1, false, 'i',
     217             :                           &datum_opts, NULL, &nelems);
     218             : 
     219           0 :         if (nelems % 2 != 0)
     220           0 :             ereport(ERROR,
     221             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     222             :                      errmsg("array must have even number of elements")));
     223             : 
     224           0 :         for (i = 0; i < nelems; i += 2)
     225             :         {
     226           0 :             char       *name = TextDatumGetCString(datum_opts[i]);
     227           0 :             char       *opt = TextDatumGetCString(datum_opts[i + 1]);
     228             : 
     229           0 :             options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
     230             :         }
     231             :     }
     232             : 
     233           0 :     p->tupstore = tuplestore_begin_heap(true, false, work_mem);
     234           0 :     rsinfo->returnMode = SFRM_Materialize;
     235           0 :     rsinfo->setResult = p->tupstore;
     236           0 :     rsinfo->setDesc = p->tupdesc;
     237             : 
     238             :     /*
     239             :      * Compute the current end-of-wal and maintain ThisTimeLineID.
     240             :      * RecoveryInProgress() will update ThisTimeLineID on promotion.
     241             :      */
     242           0 :     if (!RecoveryInProgress())
     243           0 :         end_of_wal = GetFlushRecPtr();
     244             :     else
     245           0 :         end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
     246             : 
     247           0 :     ReplicationSlotAcquire(NameStr(*name), true);
     248             : 
     249           0 :     PG_TRY();
     250             :     {
     251             :         /* restart at slot's confirmed_flush */
     252           0 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     253             :                                     options,
     254             :                                     logical_read_local_xlog_page,
     255             :                                     LogicalOutputPrepareWrite,
     256             :                                     LogicalOutputWrite, NULL);
     257             : 
     258           0 :         MemoryContextSwitchTo(oldcontext);
     259             : 
     260             :         /*
     261             :          * Check whether the output plugin writes textual output if that's
     262             :          * what we need.
     263             :          */
     264           0 :         if (!binary &&
     265           0 :             ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
     266           0 :             ereport(ERROR,
     267             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     268             :                      errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
     269             :                             NameStr(MyReplicationSlot->data.plugin),
     270             :                             format_procedure(fcinfo->flinfo->fn_oid))));
     271             : 
     272           0 :         ctx->output_writer_private = p;
     273             : 
     274             :         /*
     275             :          * Decoding of WAL must start at restart_lsn so that the entirety of
     276             :          * xacts that committed after the slot's confirmed_flush can be
     277             :          * accumulated into reorder buffers.
     278             :          */
     279           0 :         startptr = MyReplicationSlot->data.restart_lsn;
     280             : 
     281           0 :         CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, "logical decoding");
     282             : 
     283             :         /* invalidate non-timetravel entries */
     284           0 :         InvalidateSystemCaches();
     285             : 
     286             :         /* Decode until we run out of records */
     287           0 :         while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
     288           0 :                (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
     289             :         {
     290             :             XLogRecord *record;
     291           0 :             char       *errm = NULL;
     292             : 
     293           0 :             record = XLogReadRecord(ctx->reader, startptr, &errm);
     294           0 :             if (errm)
     295           0 :                 elog(ERROR, "%s", errm);
     296             : 
     297             :             /*
     298             :              * Now that we've set up the xlog reader state, subsequent calls
     299             :              * pass InvalidXLogRecPtr to say "continue from last record"
     300             :              */
     301           0 :             startptr = InvalidXLogRecPtr;
     302             : 
     303             :             /*
     304             :              * The {begin_txn,change,commit_txn}_wrapper callbacks above will
     305             :              * store the description into our tuplestore.
     306             :              */
     307           0 :             if (record != NULL)
     308           0 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     309             : 
     310             :             /* check limits */
     311           0 :             if (upto_lsn != InvalidXLogRecPtr &&
     312           0 :                 upto_lsn <= ctx->reader->EndRecPtr)
     313           0 :                 break;
     314           0 :             if (upto_nchanges != 0 &&
     315           0 :                 upto_nchanges <= p->returned_rows)
     316           0 :                 break;
     317           0 :             CHECK_FOR_INTERRUPTS();
     318             :         }
     319             : 
     320             :         tuplestore_donestoring(tupstore);
     321             : 
     322           0 :         CurrentResourceOwner = old_resowner;
     323             : 
     324             :         /*
     325             :          * Next time, start where we left off. (Hunting things, the family
     326             :          * business..)
     327             :          */
     328           0 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
     329             :         {
     330           0 :             LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
     331             : 
     332             :             /*
     333             :              * If only the confirmed_flush_lsn has changed the slot won't get
     334             :              * marked as dirty by the above. Callers on the walsender
     335             :              * interface are expected to keep track of their own progress and
     336             :              * don't need it written out. But SQL-interface users cannot
     337             :              * specify their own start positions and it's harder for them to
     338             :              * keep track of their progress, so we should make more of an
     339             :              * effort to save it for them.
     340             :              *
     341             :              * Dirty the slot so it's written out at the next checkpoint.
     342             :              * We'll still lose its position on crash, as documented, but it's
     343             :              * better than always losing the position even on clean restart.
     344             :              */
     345           0 :             ReplicationSlotMarkDirty();
     346             :         }
     347             : 
     348             :         /* free context, call shutdown callback */
     349           0 :         FreeDecodingContext(ctx);
     350             : 
     351           0 :         ReplicationSlotRelease();
     352           0 :         InvalidateSystemCaches();
     353             :     }
     354           0 :     PG_CATCH();
     355             :     {
     356             :         /* clear all timetravel entries */
     357           0 :         InvalidateSystemCaches();
     358             : 
     359           0 :         PG_RE_THROW();
     360             :     }
     361           0 :     PG_END_TRY();
     362             : 
     363           0 :     return (Datum) 0;
     364             : }
     365             : 
     366             : /*
     367             :  * SQL function returning the changestream as text, consuming the data.
     368             :  */
     369             : Datum
     370           0 : pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
     371             : {
     372           0 :     return pg_logical_slot_get_changes_guts(fcinfo, true, false);
     373             : }
     374             : 
     375             : /*
     376             :  * SQL function returning the changestream as text, only peeking ahead.
     377             :  */
     378             : Datum
     379           0 : pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
     380             : {
     381           0 :     return pg_logical_slot_get_changes_guts(fcinfo, false, false);
     382             : }
     383             : 
     384             : /*
     385             :  * SQL function returning the changestream in binary, consuming the data.
     386             :  */
     387             : Datum
     388           0 : pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
     389             : {
     390           0 :     return pg_logical_slot_get_changes_guts(fcinfo, true, true);
     391             : }
     392             : 
     393             : /*
     394             :  * SQL function returning the changestream in binary, only peeking ahead.
     395             :  */
     396             : Datum
     397           0 : pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
     398             : {
     399           0 :     return pg_logical_slot_get_changes_guts(fcinfo, false, true);
     400             : }
     401             : 
     402             : 
     403             : /*
     404             :  * SQL function for writing logical decoding message into WAL.
     405             :  */
     406             : Datum
     407           0 : pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
     408             : {
     409           0 :     bool        transactional = PG_GETARG_BOOL(0);
     410           0 :     char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
     411           0 :     bytea      *data = PG_GETARG_BYTEA_PP(2);
     412             :     XLogRecPtr  lsn;
     413             : 
     414           0 :     lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
     415             :                             transactional);
     416           0 :     PG_RETURN_LSN(lsn);
     417             : }
     418             : 
     419             : Datum
     420           0 : pg_logical_emit_message_text(PG_FUNCTION_ARGS)
     421             : {
     422             :     /* bytea and text are compatible */
     423           0 :     return pg_logical_emit_message_bytea(fcinfo);
     424             : }

Generated by: LCOV version 1.11