Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpqwalreceiver.c
4 : *
5 : * This file contains the libpq-specific parts of walreceiver. It's
6 : * loaded as a dynamic module to avoid linking the main server binary with
7 : * libpq.
8 : *
9 : * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <unistd.h>
20 : #include <sys/time.h>
21 :
22 : #include "libpq-fe.h"
23 : #include "pqexpbuffer.h"
24 : #include "access/xlog.h"
25 : #include "catalog/pg_type.h"
26 : #include "funcapi.h"
27 : #include "mb/pg_wchar.h"
28 : #include "miscadmin.h"
29 : #include "pgstat.h"
30 : #include "replication/walreceiver.h"
31 : #include "utils/builtins.h"
32 : #include "utils/memutils.h"
33 : #include "utils/pg_lsn.h"
34 : #include "utils/tuplestore.h"
35 :
36 4 : PG_MODULE_MAGIC;
37 :
38 : void _PG_init(void);
39 :
40 : struct WalReceiverConn
41 : {
42 : /* Current connection to the primary, if any */
43 : PGconn *streamConn;
44 : /* Used to remember if the connection is logical or physical */
45 : bool logical;
46 : /* Buffer for currently read records */
47 : char *recvBuf;
48 : };
49 :
50 : /* Prototypes for interface functions */
51 : static WalReceiverConn *libpqrcv_connect(const char *conninfo,
52 : bool logical, const char *appname,
53 : char **err);
54 : static void libpqrcv_check_conninfo(const char *conninfo);
55 : static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
56 : static char *libpqrcv_identify_system(WalReceiverConn *conn,
57 : TimeLineID *primary_tli,
58 : int *server_version);
59 : static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
60 : TimeLineID tli, char **filename,
61 : char **content, int *len);
62 : static bool libpqrcv_startstreaming(WalReceiverConn *conn,
63 : const WalRcvStreamOptions *options);
64 : static void libpqrcv_endstreaming(WalReceiverConn *conn,
65 : TimeLineID *next_tli);
66 : static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
67 : pgsocket *wait_fd);
68 : static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
69 : int nbytes);
70 : static char *libpqrcv_create_slot(WalReceiverConn *conn,
71 : const char *slotname,
72 : bool temporary,
73 : CRSSnapshotAction snapshot_action,
74 : XLogRecPtr *lsn);
75 : static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
76 : const char *query,
77 : const int nRetTypes,
78 : const Oid *retTypes);
79 : static void libpqrcv_disconnect(WalReceiverConn *conn);
80 :
81 : static WalReceiverFunctionsType PQWalReceiverFunctions = {
82 : libpqrcv_connect,
83 : libpqrcv_check_conninfo,
84 : libpqrcv_get_conninfo,
85 : libpqrcv_identify_system,
86 : libpqrcv_readtimelinehistoryfile,
87 : libpqrcv_startstreaming,
88 : libpqrcv_endstreaming,
89 : libpqrcv_receive,
90 : libpqrcv_send,
91 : libpqrcv_create_slot,
92 : libpqrcv_exec,
93 : libpqrcv_disconnect
94 : };
95 :
96 : /* Prototypes for private functions */
97 : static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
98 : static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
99 :
100 : /*
101 : * Module initialization function
102 : */
103 : void
104 4 : _PG_init(void)
105 : {
106 4 : if (WalReceiverFunctions != NULL)
107 0 : elog(ERROR, "libpqwalreceiver already loaded");
108 4 : WalReceiverFunctions = &PQWalReceiverFunctions;
109 4 : }
110 :
111 : /*
112 : * Establish the connection to the primary server for XLOG streaming
113 : *
114 : * Returns NULL on error and fills the err with palloc'ed error message.
115 : */
116 : static WalReceiverConn *
117 0 : libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
118 : char **err)
119 : {
120 : WalReceiverConn *conn;
121 : PostgresPollingStatusType status;
122 : const char *keys[5];
123 : const char *vals[5];
124 0 : int i = 0;
125 :
126 : /*
127 : * We use the expand_dbname parameter to process the connection string (or
128 : * URI), and pass some extra options. The deliberately undocumented
129 : * parameter "replication=true" makes it a replication connection. The
130 : * database name is ignored by the server in replication mode, but specify
131 : * "replication" for .pgpass lookup.
132 : */
133 0 : keys[i] = "dbname";
134 0 : vals[i] = conninfo;
135 0 : keys[++i] = "replication";
136 0 : vals[i] = logical ? "database" : "true";
137 0 : if (!logical)
138 : {
139 0 : keys[++i] = "dbname";
140 0 : vals[i] = "replication";
141 : }
142 0 : keys[++i] = "fallback_application_name";
143 0 : vals[i] = appname;
144 0 : if (logical)
145 : {
146 0 : keys[++i] = "client_encoding";
147 0 : vals[i] = GetDatabaseEncodingName();
148 : }
149 0 : keys[++i] = NULL;
150 0 : vals[i] = NULL;
151 :
152 0 : Assert(i < sizeof(keys));
153 :
154 0 : conn = palloc0(sizeof(WalReceiverConn));
155 0 : conn->streamConn = PQconnectStartParams(keys, vals,
156 : /* expand_dbname = */ true);
157 0 : if (PQstatus(conn->streamConn) == CONNECTION_BAD)
158 : {
159 0 : *err = pchomp(PQerrorMessage(conn->streamConn));
160 0 : return NULL;
161 : }
162 :
163 : /*
164 : * Poll connection until we have OK or FAILED status.
165 : *
166 : * Per spec for PQconnectPoll, first wait till socket is write-ready.
167 : */
168 0 : status = PGRES_POLLING_WRITING;
169 : do
170 : {
171 : int io_flag;
172 : int rc;
173 :
174 0 : if (status == PGRES_POLLING_READING)
175 0 : io_flag = WL_SOCKET_READABLE;
176 : #ifdef WIN32
177 : /* Windows needs a different test while waiting for connection-made */
178 : else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
179 : io_flag = WL_SOCKET_CONNECTED;
180 : #endif
181 : else
182 0 : io_flag = WL_SOCKET_WRITEABLE;
183 :
184 0 : rc = WaitLatchOrSocket(MyLatch,
185 : WL_POSTMASTER_DEATH |
186 : WL_LATCH_SET | io_flag,
187 0 : PQsocket(conn->streamConn),
188 : 0,
189 : WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
190 :
191 : /* Emergency bailout? */
192 0 : if (rc & WL_POSTMASTER_DEATH)
193 0 : exit(1);
194 :
195 : /* Interrupted? */
196 0 : if (rc & WL_LATCH_SET)
197 : {
198 0 : ResetLatch(MyLatch);
199 0 : CHECK_FOR_INTERRUPTS();
200 : }
201 :
202 : /* If socket is ready, advance the libpq state machine */
203 0 : if (rc & io_flag)
204 0 : status = PQconnectPoll(conn->streamConn);
205 0 : } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
206 :
207 0 : if (PQstatus(conn->streamConn) != CONNECTION_OK)
208 : {
209 0 : *err = pchomp(PQerrorMessage(conn->streamConn));
210 0 : return NULL;
211 : }
212 :
213 0 : conn->logical = logical;
214 :
215 0 : return conn;
216 : }
217 :
218 : /*
219 : * Validate connection info string (just try to parse it)
220 : */
221 : static void
222 14 : libpqrcv_check_conninfo(const char *conninfo)
223 : {
224 14 : PQconninfoOption *opts = NULL;
225 14 : char *err = NULL;
226 :
227 14 : opts = PQconninfoParse(conninfo, &err);
228 14 : if (opts == NULL)
229 4 : ereport(ERROR,
230 : (errcode(ERRCODE_SYNTAX_ERROR),
231 : errmsg("invalid connection string syntax: %s", err)));
232 :
233 10 : PQconninfoFree(opts);
234 10 : }
235 :
236 : /*
237 : * Return a user-displayable conninfo string. Any security-sensitive fields
238 : * are obfuscated.
239 : */
240 : static char *
241 0 : libpqrcv_get_conninfo(WalReceiverConn *conn)
242 : {
243 : PQconninfoOption *conn_opts;
244 : PQconninfoOption *conn_opt;
245 : PQExpBufferData buf;
246 : char *retval;
247 :
248 0 : Assert(conn->streamConn != NULL);
249 :
250 0 : initPQExpBuffer(&buf);
251 0 : conn_opts = PQconninfo(conn->streamConn);
252 :
253 0 : if (conn_opts == NULL)
254 0 : ereport(ERROR,
255 : (errmsg("could not parse connection string: %s",
256 : _("out of memory"))));
257 :
258 : /* build a clean connection string from pieces */
259 0 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
260 : {
261 : bool obfuscate;
262 :
263 : /* Skip debug and empty options */
264 0 : if (strchr(conn_opt->dispchar, 'D') ||
265 0 : conn_opt->val == NULL ||
266 0 : conn_opt->val[0] == '\0')
267 0 : continue;
268 :
269 : /* Obfuscate security-sensitive options */
270 0 : obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
271 :
272 0 : appendPQExpBuffer(&buf, "%s%s=%s",
273 0 : buf.len == 0 ? "" : " ",
274 : conn_opt->keyword,
275 : obfuscate ? "********" : conn_opt->val);
276 : }
277 :
278 0 : PQconninfoFree(conn_opts);
279 :
280 0 : retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
281 0 : termPQExpBuffer(&buf);
282 0 : return retval;
283 : }
284 :
285 : /*
286 : * Check that primary's system identifier matches ours, and fetch the current
287 : * timeline ID of the primary.
288 : */
289 : static char *
290 0 : libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli,
291 : int *server_version)
292 : {
293 : PGresult *res;
294 : char *primary_sysid;
295 :
296 : /*
297 : * Get the system identifier and timeline ID as a DataRow message from the
298 : * primary server.
299 : */
300 0 : res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
301 0 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
302 : {
303 0 : PQclear(res);
304 0 : ereport(ERROR,
305 : (errmsg("could not receive database system identifier and timeline ID from "
306 : "the primary server: %s",
307 : pchomp(PQerrorMessage(conn->streamConn)))));
308 : }
309 0 : if (PQnfields(res) < 3 || PQntuples(res) != 1)
310 : {
311 0 : int ntuples = PQntuples(res);
312 0 : int nfields = PQnfields(res);
313 :
314 0 : PQclear(res);
315 0 : ereport(ERROR,
316 : (errmsg("invalid response from primary server"),
317 : errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
318 : ntuples, nfields, 3, 1)));
319 : }
320 0 : primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
321 0 : *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
322 0 : PQclear(res);
323 :
324 0 : *server_version = PQserverVersion(conn->streamConn);
325 :
326 0 : return primary_sysid;
327 : }
328 :
329 : /*
330 : * Start streaming WAL data from given streaming options.
331 : *
332 : * Returns true if we switched successfully to copy-both mode. False
333 : * means the server received the command and executed it successfully, but
334 : * didn't switch to copy-mode. That means that there was no WAL on the
335 : * requested timeline and starting point, because the server switched to
336 : * another timeline at or before the requested starting point. On failure,
337 : * throws an ERROR.
338 : */
339 : static bool
340 0 : libpqrcv_startstreaming(WalReceiverConn *conn,
341 : const WalRcvStreamOptions *options)
342 : {
343 : StringInfoData cmd;
344 : PGresult *res;
345 :
346 0 : Assert(options->logical == conn->logical);
347 0 : Assert(options->slotname || !options->logical);
348 :
349 0 : initStringInfo(&cmd);
350 :
351 : /* Build the command. */
352 0 : appendStringInfoString(&cmd, "START_REPLICATION");
353 0 : if (options->slotname != NULL)
354 0 : appendStringInfo(&cmd, " SLOT \"%s\"",
355 : options->slotname);
356 :
357 0 : if (options->logical)
358 0 : appendStringInfoString(&cmd, " LOGICAL");
359 :
360 0 : appendStringInfo(&cmd, " %X/%X",
361 0 : (uint32) (options->startpoint >> 32),
362 0 : (uint32) options->startpoint);
363 :
364 : /*
365 : * Additional options are different depending on if we are doing logical
366 : * or physical replication.
367 : */
368 0 : if (options->logical)
369 : {
370 : char *pubnames_str;
371 : List *pubnames;
372 : char *pubnames_literal;
373 :
374 0 : appendStringInfoString(&cmd, " (");
375 :
376 0 : appendStringInfo(&cmd, "proto_version '%u'",
377 : options->proto.logical.proto_version);
378 :
379 0 : pubnames = options->proto.logical.publication_names;
380 0 : pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
381 0 : if (!pubnames_str)
382 0 : ereport(ERROR,
383 : (errmsg("could not start WAL streaming: %s",
384 : pchomp(PQerrorMessage(conn->streamConn)))));
385 0 : pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
386 : strlen(pubnames_str));
387 0 : if (!pubnames_literal)
388 0 : ereport(ERROR,
389 : (errmsg("could not start WAL streaming: %s",
390 : pchomp(PQerrorMessage(conn->streamConn)))));
391 0 : appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
392 0 : PQfreemem(pubnames_literal);
393 0 : pfree(pubnames_str);
394 :
395 0 : appendStringInfoChar(&cmd, ')');
396 : }
397 : else
398 0 : appendStringInfo(&cmd, " TIMELINE %u",
399 : options->proto.physical.startpointTLI);
400 :
401 : /* Start streaming. */
402 0 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
403 0 : pfree(cmd.data);
404 :
405 0 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
406 : {
407 0 : PQclear(res);
408 0 : return false;
409 : }
410 0 : else if (PQresultStatus(res) != PGRES_COPY_BOTH)
411 : {
412 0 : PQclear(res);
413 0 : ereport(ERROR,
414 : (errmsg("could not start WAL streaming: %s",
415 : pchomp(PQerrorMessage(conn->streamConn)))));
416 : }
417 0 : PQclear(res);
418 0 : return true;
419 : }
420 :
421 : /*
422 : * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
423 : * reported by the server, or 0 if it did not report it.
424 : */
425 : static void
426 0 : libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
427 : {
428 : PGresult *res;
429 :
430 0 : if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
431 0 : PQflush(conn->streamConn))
432 0 : ereport(ERROR,
433 : (errmsg("could not send end-of-streaming message to primary: %s",
434 : pchomp(PQerrorMessage(conn->streamConn)))));
435 :
436 0 : *next_tli = 0;
437 :
438 : /*
439 : * After COPY is finished, we should receive a result set indicating the
440 : * next timeline's ID, or just CommandComplete if the server was shut
441 : * down.
442 : *
443 : * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
444 : * also possible in case we aborted the copy in mid-stream.
445 : */
446 0 : res = PQgetResult(conn->streamConn);
447 0 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
448 : {
449 : /*
450 : * Read the next timeline's ID. The server also sends the timeline's
451 : * starting point, but it is ignored.
452 : */
453 0 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
454 0 : ereport(ERROR,
455 : (errmsg("unexpected result set after end-of-streaming")));
456 0 : *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
457 0 : PQclear(res);
458 :
459 : /* the result set should be followed by CommandComplete */
460 0 : res = PQgetResult(conn->streamConn);
461 : }
462 0 : else if (PQresultStatus(res) == PGRES_COPY_OUT)
463 : {
464 0 : PQclear(res);
465 :
466 : /* End the copy */
467 0 : if (PQendcopy(conn->streamConn))
468 0 : ereport(ERROR,
469 : (errmsg("error while shutting down streaming COPY: %s",
470 : pchomp(PQerrorMessage(conn->streamConn)))));
471 :
472 : /* CommandComplete should follow */
473 0 : res = PQgetResult(conn->streamConn);
474 : }
475 :
476 0 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
477 0 : ereport(ERROR,
478 : (errmsg("error reading result of streaming command: %s",
479 : pchomp(PQerrorMessage(conn->streamConn)))));
480 0 : PQclear(res);
481 :
482 : /* Verify that there are no more results */
483 0 : res = PQgetResult(conn->streamConn);
484 0 : if (res != NULL)
485 0 : ereport(ERROR,
486 : (errmsg("unexpected result after CommandComplete: %s",
487 : pchomp(PQerrorMessage(conn->streamConn)))));
488 0 : }
489 :
490 : /*
491 : * Fetch the timeline history file for 'tli' from primary.
492 : */
493 : static void
494 0 : libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
495 : TimeLineID tli, char **filename,
496 : char **content, int *len)
497 : {
498 : PGresult *res;
499 : char cmd[64];
500 :
501 0 : Assert(!conn->logical);
502 :
503 : /*
504 : * Request the primary to send over the history file for given timeline.
505 : */
506 0 : snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
507 0 : res = libpqrcv_PQexec(conn->streamConn, cmd);
508 0 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
509 : {
510 0 : PQclear(res);
511 0 : ereport(ERROR,
512 : (errmsg("could not receive timeline history file from "
513 : "the primary server: %s",
514 : pchomp(PQerrorMessage(conn->streamConn)))));
515 : }
516 0 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
517 : {
518 0 : int ntuples = PQntuples(res);
519 0 : int nfields = PQnfields(res);
520 :
521 0 : PQclear(res);
522 0 : ereport(ERROR,
523 : (errmsg("invalid response from primary server"),
524 : errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
525 : ntuples, nfields)));
526 : }
527 0 : *filename = pstrdup(PQgetvalue(res, 0, 0));
528 :
529 0 : *len = PQgetlength(res, 0, 1);
530 0 : *content = palloc(*len);
531 0 : memcpy(*content, PQgetvalue(res, 0, 1), *len);
532 0 : PQclear(res);
533 0 : }
534 :
535 : /*
536 : * Send a query and wait for the results by using the asynchronous libpq
537 : * functions and socket readiness events.
538 : *
539 : * We must not use the regular blocking libpq functions like PQexec()
540 : * since they are uninterruptible by signals on some platforms, such as
541 : * Windows.
542 : *
543 : * The function is modeled on PQexec() in libpq, but only implements
544 : * those parts that are in use in the walreceiver api.
545 : *
546 : * Queries are always executed on the connection in streamConn.
547 : */
548 : static PGresult *
549 0 : libpqrcv_PQexec(PGconn *streamConn, const char *query)
550 : {
551 0 : PGresult *result = NULL;
552 0 : PGresult *lastResult = NULL;
553 :
554 : /*
555 : * PQexec() silently discards any prior query results on the connection.
556 : * This is not required for this function as it's expected that the caller
557 : * (which is this library in all cases) will behave correctly and we don't
558 : * have to be backwards compatible with old libpq.
559 : */
560 :
561 : /*
562 : * Submit a query. Since we don't use non-blocking mode, this also can
563 : * block. But its risk is relatively small, so we ignore that for now.
564 : */
565 0 : if (!PQsendQuery(streamConn, query))
566 0 : return NULL;
567 :
568 : for (;;)
569 : {
570 : /*
571 : * Receive data until PQgetResult is ready to get the result without
572 : * blocking.
573 : */
574 0 : while (PQisBusy(streamConn))
575 : {
576 : int rc;
577 :
578 : /*
579 : * We don't need to break down the sleep into smaller increments,
580 : * since we'll get interrupted by signals and can either handle
581 : * interrupts here or elog(FATAL) within SIGTERM signal handler if
582 : * the signal arrives in the middle of establishment of
583 : * replication connection.
584 : */
585 0 : rc = WaitLatchOrSocket(MyLatch,
586 : WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
587 : WL_LATCH_SET,
588 : PQsocket(streamConn),
589 : 0,
590 : WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
591 :
592 : /* Emergency bailout? */
593 0 : if (rc & WL_POSTMASTER_DEATH)
594 0 : exit(1);
595 :
596 : /* Interrupted? */
597 0 : if (rc & WL_LATCH_SET)
598 : {
599 0 : ResetLatch(MyLatch);
600 0 : CHECK_FOR_INTERRUPTS();
601 : }
602 :
603 : /* Consume whatever data is available from the socket */
604 0 : if (PQconsumeInput(streamConn) == 0)
605 : {
606 : /* trouble; drop whatever we had and return NULL */
607 0 : PQclear(lastResult);
608 0 : return NULL;
609 : }
610 : }
611 :
612 : /*
613 : * Emulate PQexec()'s behavior of returning the last result when there
614 : * are many. We are fine with returning just last error message.
615 : */
616 0 : result = PQgetResult(streamConn);
617 0 : if (result == NULL)
618 0 : break; /* query is complete */
619 :
620 0 : PQclear(lastResult);
621 0 : lastResult = result;
622 :
623 0 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
624 0 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
625 0 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
626 0 : PQstatus(streamConn) == CONNECTION_BAD)
627 : break;
628 0 : }
629 :
630 0 : return lastResult;
631 : }
632 :
633 : /*
634 : * Disconnect connection to primary, if any.
635 : */
636 : static void
637 0 : libpqrcv_disconnect(WalReceiverConn *conn)
638 : {
639 0 : PQfinish(conn->streamConn);
640 0 : if (conn->recvBuf != NULL)
641 0 : PQfreemem(conn->recvBuf);
642 0 : pfree(conn);
643 0 : }
644 :
645 : /*
646 : * Receive a message available from XLOG stream.
647 : *
648 : * Returns:
649 : *
650 : * If data was received, returns the length of the data. *buffer is set to
651 : * point to a buffer holding the received message. The buffer is only valid
652 : * until the next libpqrcv_* call.
653 : *
654 : * If no data was available immediately, returns 0, and *wait_fd is set to a
655 : * socket descriptor which can be waited on before trying again.
656 : *
657 : * -1 if the server ended the COPY.
658 : *
659 : * ereports on error.
660 : */
661 : static int
662 0 : libpqrcv_receive(WalReceiverConn *conn, char **buffer,
663 : pgsocket *wait_fd)
664 : {
665 : int rawlen;
666 :
667 0 : if (conn->recvBuf != NULL)
668 0 : PQfreemem(conn->recvBuf);
669 0 : conn->recvBuf = NULL;
670 :
671 : /* Try to receive a CopyData message */
672 0 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
673 0 : if (rawlen == 0)
674 : {
675 : /* Try consuming some data. */
676 0 : if (PQconsumeInput(conn->streamConn) == 0)
677 0 : ereport(ERROR,
678 : (errmsg("could not receive data from WAL stream: %s",
679 : pchomp(PQerrorMessage(conn->streamConn)))));
680 :
681 : /* Now that we've consumed some input, try again */
682 0 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
683 0 : if (rawlen == 0)
684 : {
685 : /* Tell caller to try again when our socket is ready. */
686 0 : *wait_fd = PQsocket(conn->streamConn);
687 0 : return 0;
688 : }
689 : }
690 0 : if (rawlen == -1) /* end-of-streaming or error */
691 : {
692 : PGresult *res;
693 :
694 0 : res = PQgetResult(conn->streamConn);
695 0 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
696 : {
697 0 : PQclear(res);
698 :
699 : /* Verify that there are no more results. */
700 0 : res = PQgetResult(conn->streamConn);
701 0 : if (res != NULL)
702 : {
703 0 : PQclear(res);
704 :
705 : /*
706 : * If the other side closed the connection orderly (otherwise
707 : * we'd seen an error, or PGRES_COPY_IN) don't report an error
708 : * here, but let callers deal with it.
709 : */
710 0 : if (PQstatus(conn->streamConn) == CONNECTION_BAD)
711 0 : return -1;
712 :
713 0 : ereport(ERROR,
714 : (errmsg("unexpected result after CommandComplete: %s",
715 : PQerrorMessage(conn->streamConn))));
716 : }
717 :
718 0 : return -1;
719 : }
720 0 : else if (PQresultStatus(res) == PGRES_COPY_IN)
721 : {
722 0 : PQclear(res);
723 0 : return -1;
724 : }
725 : else
726 : {
727 0 : PQclear(res);
728 0 : ereport(ERROR,
729 : (errmsg("could not receive data from WAL stream: %s",
730 : pchomp(PQerrorMessage(conn->streamConn)))));
731 : }
732 : }
733 0 : if (rawlen < -1)
734 0 : ereport(ERROR,
735 : (errmsg("could not receive data from WAL stream: %s",
736 : pchomp(PQerrorMessage(conn->streamConn)))));
737 :
738 : /* Return received messages to caller */
739 0 : *buffer = conn->recvBuf;
740 0 : return rawlen;
741 : }
742 :
743 : /*
744 : * Send a message to XLOG stream.
745 : *
746 : * ereports on error.
747 : */
748 : static void
749 0 : libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
750 : {
751 0 : if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
752 0 : PQflush(conn->streamConn))
753 0 : ereport(ERROR,
754 : (errmsg("could not send data to WAL stream: %s",
755 : pchomp(PQerrorMessage(conn->streamConn)))));
756 0 : }
757 :
758 : /*
759 : * Create new replication slot.
760 : * Returns the name of the exported snapshot for logical slot or NULL for
761 : * physical slot.
762 : */
763 : static char *
764 0 : libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
765 : bool temporary, CRSSnapshotAction snapshot_action,
766 : XLogRecPtr *lsn)
767 : {
768 : PGresult *res;
769 : StringInfoData cmd;
770 : char *snapshot;
771 :
772 0 : initStringInfo(&cmd);
773 :
774 0 : appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
775 :
776 0 : if (temporary)
777 0 : appendStringInfoString(&cmd, " TEMPORARY");
778 :
779 0 : if (conn->logical)
780 : {
781 0 : appendStringInfoString(&cmd, " LOGICAL pgoutput");
782 0 : switch (snapshot_action)
783 : {
784 : case CRS_EXPORT_SNAPSHOT:
785 0 : appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
786 0 : break;
787 : case CRS_NOEXPORT_SNAPSHOT:
788 0 : appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
789 0 : break;
790 : case CRS_USE_SNAPSHOT:
791 0 : appendStringInfoString(&cmd, " USE_SNAPSHOT");
792 0 : break;
793 : }
794 : }
795 :
796 0 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
797 0 : pfree(cmd.data);
798 :
799 0 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
800 : {
801 0 : PQclear(res);
802 0 : ereport(ERROR,
803 : (errmsg("could not create replication slot \"%s\": %s",
804 : slotname, pchomp(PQerrorMessage(conn->streamConn)))));
805 : }
806 :
807 0 : *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
808 : CStringGetDatum(PQgetvalue(res, 0, 1))));
809 0 : if (!PQgetisnull(res, 0, 2))
810 0 : snapshot = pstrdup(PQgetvalue(res, 0, 2));
811 : else
812 0 : snapshot = NULL;
813 :
814 0 : PQclear(res);
815 :
816 0 : return snapshot;
817 : }
818 :
819 : /*
820 : * Convert tuple query result to tuplestore.
821 : */
822 : static void
823 0 : libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
824 : const int nRetTypes, const Oid *retTypes)
825 : {
826 : int tupn;
827 : int coln;
828 0 : int nfields = PQnfields(pgres);
829 : HeapTuple tuple;
830 : AttInMetadata *attinmeta;
831 : MemoryContext rowcontext;
832 : MemoryContext oldcontext;
833 :
834 : /* Make sure we got expected number of fields. */
835 0 : if (nfields != nRetTypes)
836 0 : ereport(ERROR,
837 : (errmsg("invalid query response"),
838 : errdetail("Expected %d fields, got %d fields.",
839 : nRetTypes, nfields)));
840 :
841 0 : walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
842 :
843 : /* Create tuple descriptor corresponding to expected result. */
844 0 : walres->tupledesc = CreateTemplateTupleDesc(nRetTypes, false);
845 0 : for (coln = 0; coln < nRetTypes; coln++)
846 0 : TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
847 0 : PQfname(pgres, coln), retTypes[coln], -1, 0);
848 0 : attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
849 :
850 : /* No point in doing more here if there were no tuples returned. */
851 0 : if (PQntuples(pgres) == 0)
852 0 : return;
853 :
854 : /* Create temporary context for local allocations. */
855 0 : rowcontext = AllocSetContextCreate(CurrentMemoryContext,
856 : "libpqrcv query result context",
857 : ALLOCSET_DEFAULT_SIZES);
858 :
859 : /* Process returned rows. */
860 0 : for (tupn = 0; tupn < PQntuples(pgres); tupn++)
861 : {
862 : char *cstrs[MaxTupleAttributeNumber];
863 :
864 0 : CHECK_FOR_INTERRUPTS();
865 :
866 : /* Do the allocations in temporary context. */
867 0 : oldcontext = MemoryContextSwitchTo(rowcontext);
868 :
869 : /*
870 : * Fill cstrs with null-terminated strings of column values.
871 : */
872 0 : for (coln = 0; coln < nfields; coln++)
873 : {
874 0 : if (PQgetisnull(pgres, tupn, coln))
875 0 : cstrs[coln] = NULL;
876 : else
877 0 : cstrs[coln] = PQgetvalue(pgres, tupn, coln);
878 : }
879 :
880 : /* Convert row to a tuple, and add it to the tuplestore */
881 0 : tuple = BuildTupleFromCStrings(attinmeta, cstrs);
882 0 : tuplestore_puttuple(walres->tuplestore, tuple);
883 :
884 : /* Clean up */
885 0 : MemoryContextSwitchTo(oldcontext);
886 0 : MemoryContextReset(rowcontext);
887 : }
888 :
889 0 : MemoryContextDelete(rowcontext);
890 : }
891 :
892 : /*
893 : * Public interface for sending generic queries (and commands).
894 : *
895 : * This can only be called from process connected to database.
896 : */
897 : static WalRcvExecResult *
898 0 : libpqrcv_exec(WalReceiverConn *conn, const char *query,
899 : const int nRetTypes, const Oid *retTypes)
900 : {
901 0 : PGresult *pgres = NULL;
902 0 : WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
903 :
904 0 : if (MyDatabaseId == InvalidOid)
905 0 : ereport(ERROR,
906 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
907 : errmsg("the query interface requires a database connection")));
908 :
909 0 : pgres = libpqrcv_PQexec(conn->streamConn, query);
910 :
911 0 : switch (PQresultStatus(pgres))
912 : {
913 : case PGRES_SINGLE_TUPLE:
914 : case PGRES_TUPLES_OK:
915 0 : walres->status = WALRCV_OK_TUPLES;
916 0 : libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
917 0 : break;
918 :
919 : case PGRES_COPY_IN:
920 0 : walres->status = WALRCV_OK_COPY_IN;
921 0 : break;
922 :
923 : case PGRES_COPY_OUT:
924 0 : walres->status = WALRCV_OK_COPY_OUT;
925 0 : break;
926 :
927 : case PGRES_COPY_BOTH:
928 0 : walres->status = WALRCV_OK_COPY_BOTH;
929 0 : break;
930 :
931 : case PGRES_COMMAND_OK:
932 0 : walres->status = WALRCV_OK_COMMAND;
933 0 : break;
934 :
935 : /* Empty query is considered error. */
936 : case PGRES_EMPTY_QUERY:
937 0 : walres->status = WALRCV_ERROR;
938 0 : walres->err = _("empty query");
939 0 : break;
940 :
941 : case PGRES_NONFATAL_ERROR:
942 : case PGRES_FATAL_ERROR:
943 : case PGRES_BAD_RESPONSE:
944 0 : walres->status = WALRCV_ERROR;
945 0 : walres->err = pchomp(PQerrorMessage(conn->streamConn));
946 0 : break;
947 : }
948 :
949 0 : PQclear(pgres);
950 :
951 0 : return walres;
952 : }
953 :
954 : /*
955 : * Given a List of strings, return it as single comma separated
956 : * string, quoting identifiers as needed.
957 : *
958 : * This is essentially the reverse of SplitIdentifierString.
959 : *
960 : * The caller should free the result.
961 : */
962 : static char *
963 0 : stringlist_to_identifierstr(PGconn *conn, List *strings)
964 : {
965 : ListCell *lc;
966 : StringInfoData res;
967 0 : bool first = true;
968 :
969 0 : initStringInfo(&res);
970 :
971 0 : foreach(lc, strings)
972 : {
973 0 : char *val = strVal(lfirst(lc));
974 : char *val_escaped;
975 :
976 0 : if (first)
977 0 : first = false;
978 : else
979 0 : appendStringInfoChar(&res, ',');
980 :
981 0 : val_escaped = PQescapeIdentifier(conn, val, strlen(val));
982 0 : if (!val_escaped)
983 : {
984 0 : free(res.data);
985 0 : return NULL;
986 : }
987 0 : appendStringInfoString(&res, val_escaped);
988 0 : PQfreemem(val_escaped);
989 : }
990 :
991 0 : return res.data;
992 : }
|