Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slotfuncs.c
4 : * Support functions for replication slots
5 : *
6 : * Copyright (c) 2012-2017, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/slotfuncs.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres.h"
15 :
16 : #include "funcapi.h"
17 : #include "miscadmin.h"
18 :
19 : #include "access/htup_details.h"
20 : #include "replication/slot.h"
21 : #include "replication/logical.h"
22 : #include "replication/logicalfuncs.h"
23 : #include "utils/builtins.h"
24 : #include "utils/pg_lsn.h"
25 :
26 : static void
27 0 : check_permissions(void)
28 : {
29 0 : if (!superuser() && !has_rolreplication(GetUserId()))
30 0 : ereport(ERROR,
31 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
32 : (errmsg("must be superuser or replication role to use replication slots"))));
33 0 : }
34 :
35 : /*
36 : * SQL function for creating a new physical (streaming replication)
37 : * replication slot.
38 : */
39 : Datum
40 0 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
41 : {
42 0 : Name name = PG_GETARG_NAME(0);
43 0 : bool immediately_reserve = PG_GETARG_BOOL(1);
44 0 : bool temporary = PG_GETARG_BOOL(2);
45 : Datum values[2];
46 : bool nulls[2];
47 : TupleDesc tupdesc;
48 : HeapTuple tuple;
49 : Datum result;
50 :
51 0 : Assert(!MyReplicationSlot);
52 :
53 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
54 0 : elog(ERROR, "return type must be a row type");
55 :
56 0 : check_permissions();
57 :
58 0 : CheckSlotRequirements();
59 :
60 : /* acquire replication slot, this will check for conflicting names */
61 0 : ReplicationSlotCreate(NameStr(*name), false,
62 : temporary ? RS_TEMPORARY : RS_PERSISTENT);
63 :
64 0 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
65 0 : nulls[0] = false;
66 :
67 0 : if (immediately_reserve)
68 : {
69 : /* Reserve WAL as the user asked for it */
70 0 : ReplicationSlotReserveWal();
71 :
72 : /* Write this slot to disk */
73 0 : ReplicationSlotMarkDirty();
74 0 : ReplicationSlotSave();
75 :
76 0 : values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
77 0 : nulls[1] = false;
78 : }
79 : else
80 : {
81 0 : nulls[1] = true;
82 : }
83 :
84 0 : tuple = heap_form_tuple(tupdesc, values, nulls);
85 0 : result = HeapTupleGetDatum(tuple);
86 :
87 0 : ReplicationSlotRelease();
88 :
89 0 : PG_RETURN_DATUM(result);
90 : }
91 :
92 :
93 : /*
94 : * SQL function for creating a new logical replication slot.
95 : */
96 : Datum
97 0 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
98 : {
99 0 : Name name = PG_GETARG_NAME(0);
100 0 : Name plugin = PG_GETARG_NAME(1);
101 0 : bool temporary = PG_GETARG_BOOL(2);
102 :
103 0 : LogicalDecodingContext *ctx = NULL;
104 :
105 : TupleDesc tupdesc;
106 : HeapTuple tuple;
107 : Datum result;
108 : Datum values[2];
109 : bool nulls[2];
110 :
111 0 : Assert(!MyReplicationSlot);
112 :
113 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
114 0 : elog(ERROR, "return type must be a row type");
115 :
116 0 : check_permissions();
117 :
118 0 : CheckLogicalDecodingRequirements();
119 :
120 : /*
121 : * Acquire a logical decoding slot, this will check for conflicting names.
122 : * Initially create persistent slot as ephemeral - that allows us to
123 : * nicely handle errors during initialization because it'll get dropped if
124 : * this transaction fails. We'll make it persistent at the end. Temporary
125 : * slots can be created as temporary from beginning as they get dropped on
126 : * error as well.
127 : */
128 0 : ReplicationSlotCreate(NameStr(*name), true,
129 : temporary ? RS_TEMPORARY : RS_EPHEMERAL);
130 :
131 : /*
132 : * Create logical decoding context, to build the initial snapshot.
133 : */
134 0 : ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
135 : false, /* do not build snapshot */
136 : logical_read_local_xlog_page, NULL, NULL,
137 : NULL);
138 :
139 : /* build initial snapshot, might take a while */
140 0 : DecodingContextFindStartpoint(ctx);
141 :
142 0 : values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name));
143 0 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
144 :
145 : /* don't need the decoding context anymore */
146 0 : FreeDecodingContext(ctx);
147 :
148 0 : memset(nulls, 0, sizeof(nulls));
149 :
150 0 : tuple = heap_form_tuple(tupdesc, values, nulls);
151 0 : result = HeapTupleGetDatum(tuple);
152 :
153 : /* ok, slot is now fully created, mark it as persistent if needed */
154 0 : if (!temporary)
155 0 : ReplicationSlotPersist();
156 0 : ReplicationSlotRelease();
157 :
158 0 : PG_RETURN_DATUM(result);
159 : }
160 :
161 :
162 : /*
163 : * SQL function for dropping a replication slot.
164 : */
165 : Datum
166 0 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
167 : {
168 0 : Name name = PG_GETARG_NAME(0);
169 :
170 0 : check_permissions();
171 :
172 0 : CheckSlotRequirements();
173 :
174 0 : ReplicationSlotDrop(NameStr(*name), true);
175 :
176 0 : PG_RETURN_VOID();
177 : }
178 :
179 : /*
180 : * pg_get_replication_slots - SQL SRF showing active replication slots.
181 : */
182 : Datum
183 0 : pg_get_replication_slots(PG_FUNCTION_ARGS)
184 : {
185 : #define PG_GET_REPLICATION_SLOTS_COLS 11
186 0 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
187 : TupleDesc tupdesc;
188 : Tuplestorestate *tupstore;
189 : MemoryContext per_query_ctx;
190 : MemoryContext oldcontext;
191 : int slotno;
192 :
193 : /* check to see if caller supports us returning a tuplestore */
194 0 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
195 0 : ereport(ERROR,
196 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
197 : errmsg("set-valued function called in context that cannot accept a set")));
198 0 : if (!(rsinfo->allowedModes & SFRM_Materialize))
199 0 : ereport(ERROR,
200 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
201 : errmsg("materialize mode required, but it is not " \
202 : "allowed in this context")));
203 :
204 : /* Build a tuple descriptor for our result type */
205 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
206 0 : elog(ERROR, "return type must be a row type");
207 :
208 : /*
209 : * We don't require any special permission to see this function's data
210 : * because nothing should be sensitive. The most critical being the slot
211 : * name, which shouldn't contain anything particularly sensitive.
212 : */
213 :
214 0 : per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
215 0 : oldcontext = MemoryContextSwitchTo(per_query_ctx);
216 :
217 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
218 0 : rsinfo->returnMode = SFRM_Materialize;
219 0 : rsinfo->setResult = tupstore;
220 0 : rsinfo->setDesc = tupdesc;
221 :
222 0 : MemoryContextSwitchTo(oldcontext);
223 :
224 0 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
225 0 : for (slotno = 0; slotno < max_replication_slots; slotno++)
226 : {
227 0 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
228 : Datum values[PG_GET_REPLICATION_SLOTS_COLS];
229 : bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
230 :
231 : ReplicationSlotPersistency persistency;
232 : TransactionId xmin;
233 : TransactionId catalog_xmin;
234 : XLogRecPtr restart_lsn;
235 : XLogRecPtr confirmed_flush_lsn;
236 : pid_t active_pid;
237 : Oid database;
238 : NameData slot_name;
239 : NameData plugin;
240 : int i;
241 :
242 0 : if (!slot->in_use)
243 0 : continue;
244 :
245 0 : SpinLockAcquire(&slot->mutex);
246 :
247 0 : xmin = slot->data.xmin;
248 0 : catalog_xmin = slot->data.catalog_xmin;
249 0 : database = slot->data.database;
250 0 : restart_lsn = slot->data.restart_lsn;
251 0 : confirmed_flush_lsn = slot->data.confirmed_flush;
252 0 : namecpy(&slot_name, &slot->data.name);
253 0 : namecpy(&plugin, &slot->data.plugin);
254 0 : active_pid = slot->active_pid;
255 0 : persistency = slot->data.persistency;
256 :
257 0 : SpinLockRelease(&slot->mutex);
258 :
259 0 : memset(nulls, 0, sizeof(nulls));
260 :
261 0 : i = 0;
262 0 : values[i++] = NameGetDatum(&slot_name);
263 :
264 0 : if (database == InvalidOid)
265 0 : nulls[i++] = true;
266 : else
267 0 : values[i++] = NameGetDatum(&plugin);
268 :
269 0 : if (database == InvalidOid)
270 0 : values[i++] = CStringGetTextDatum("physical");
271 : else
272 0 : values[i++] = CStringGetTextDatum("logical");
273 :
274 0 : if (database == InvalidOid)
275 0 : nulls[i++] = true;
276 : else
277 0 : values[i++] = database;
278 :
279 0 : values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
280 0 : values[i++] = BoolGetDatum(active_pid != 0);
281 :
282 0 : if (active_pid != 0)
283 0 : values[i++] = Int32GetDatum(active_pid);
284 : else
285 0 : nulls[i++] = true;
286 :
287 0 : if (xmin != InvalidTransactionId)
288 0 : values[i++] = TransactionIdGetDatum(xmin);
289 : else
290 0 : nulls[i++] = true;
291 :
292 0 : if (catalog_xmin != InvalidTransactionId)
293 0 : values[i++] = TransactionIdGetDatum(catalog_xmin);
294 : else
295 0 : nulls[i++] = true;
296 :
297 0 : if (restart_lsn != InvalidXLogRecPtr)
298 0 : values[i++] = LSNGetDatum(restart_lsn);
299 : else
300 0 : nulls[i++] = true;
301 :
302 0 : if (confirmed_flush_lsn != InvalidXLogRecPtr)
303 0 : values[i++] = LSNGetDatum(confirmed_flush_lsn);
304 : else
305 0 : nulls[i++] = true;
306 :
307 0 : tuplestore_putvalues(tupstore, tupdesc, values, nulls);
308 : }
309 0 : LWLockRelease(ReplicationSlotControlLock);
310 :
311 : tuplestore_donestoring(tupstore);
312 :
313 0 : return (Datum) 0;
314 : }
|