Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pqcomm.c
4 : * Communication functions between the Frontend and the Backend
5 : *
6 : * These routines handle the low-level details of communication between
7 : * frontend and backend. They just shove data across the communication
8 : * channel, and are ignorant of the semantics of the data --- or would be,
9 : * except for major brain damage in the design of the old COPY OUT protocol.
10 : * Unfortunately, COPY OUT was designed to commandeer the communication
11 : * channel (it just transfers data without wrapping it into messages).
12 : * No other messages can be sent while COPY OUT is in progress; and if the
13 : * copy is aborted by an ereport(ERROR), we need to close out the copy so that
14 : * the frontend gets back into sync. Therefore, these routines have to be
15 : * aware of COPY OUT state. (New COPY-OUT is message-based and does *not*
16 : * set the DoingCopyOut flag.)
17 : *
18 : * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19 : * pq_putbytes(), especially if the message would require multiple calls
20 : * to send. Instead, use the routines in pqformat.c to construct the message
21 : * in a buffer and then emit it in one call to pq_putmessage. This ensures
22 : * that the channel will not be clogged by an incomplete message if execution
23 : * is aborted by ereport(ERROR) partway through the message. The only
24 : * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
25 : *
26 : * At one time, libpq was shared between frontend and backend, but now
27 : * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28 : * All that remains is similarities of names to trap the unwary...
29 : *
30 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
31 : * Portions Copyright (c) 1994, Regents of the University of California
32 : *
33 : * src/backend/libpq/pqcomm.c
34 : *
35 : *-------------------------------------------------------------------------
36 : */
37 :
38 : /*------------------------
39 : * INTERFACE ROUTINES
40 : *
41 : * setup/teardown:
42 : * StreamServerPort - Open postmaster's server port
43 : * StreamConnection - Create new connection with client
44 : * StreamClose - Close a client/backend connection
45 : * TouchSocketFiles - Protect socket files against /tmp cleaners
46 : * pq_init - initialize libpq at backend startup
47 : * pq_comm_reset - reset libpq during error recovery
48 : * pq_close - shutdown libpq at backend exit
49 : *
50 : * low-level I/O:
51 : * pq_getbytes - get a known number of bytes from connection
52 : * pq_getstring - get a null terminated string from connection
53 : * pq_getmessage - get a message with length word from connection
54 : * pq_getbyte - get next byte from connection
55 : * pq_peekbyte - peek at next byte from connection
56 : * pq_putbytes - send bytes to connection (not flushed until pq_flush)
57 : * pq_flush - flush pending output
58 : * pq_flush_if_writable - flush pending output if writable without blocking
59 : * pq_getbyte_if_available - get a byte if available without blocking
60 : *
61 : * message-level I/O (and old-style-COPY-OUT cruft):
62 : * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
63 : * pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
64 : * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
65 : * pq_endcopyout - end a COPY OUT transfer
66 : *
67 : *------------------------
68 : */
69 : #include "postgres.h"
70 :
71 : #include <signal.h>
72 : #include <fcntl.h>
73 : #include <grp.h>
74 : #include <unistd.h>
75 : #include <sys/file.h>
76 : #include <sys/socket.h>
77 : #include <sys/stat.h>
78 : #include <sys/time.h>
79 : #include <netdb.h>
80 : #include <netinet/in.h>
81 : #ifdef HAVE_NETINET_TCP_H
82 : #include <netinet/tcp.h>
83 : #endif
84 : #include <arpa/inet.h>
85 : #ifdef HAVE_UTIME_H
86 : #include <utime.h>
87 : #endif
88 : #ifdef _MSC_VER /* mstcpip.h is missing on mingw */
89 : #include <mstcpip.h>
90 : #endif
91 :
92 : #include "common/ip.h"
93 : #include "libpq/libpq.h"
94 : #include "miscadmin.h"
95 : #include "storage/ipc.h"
96 : #include "utils/guc.h"
97 : #include "utils/memutils.h"
98 :
99 : /*
100 : * Cope with the various platform-specific ways to spell TCP keepalive socket
101 : * options. This doesn't cover Windows, which as usual does its own thing.
102 : */
103 : #if defined(TCP_KEEPIDLE)
104 : /* TCP_KEEPIDLE is the name of this option on Linux and *BSD */
105 : #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPIDLE
106 : #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPIDLE"
107 : #elif defined(TCP_KEEPALIVE_THRESHOLD)
108 : /* TCP_KEEPALIVE_THRESHOLD is the name of this option on Solaris >= 11 */
109 : #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE_THRESHOLD
110 : #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE_THRESHOLD"
111 : #elif defined(TCP_KEEPALIVE) && defined(__darwin__)
112 : /* TCP_KEEPALIVE is the name of this option on macOS */
113 : /* Caution: Solaris has this symbol but it means something different */
114 : #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE
115 : #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE"
116 : #endif
117 :
118 : /*
119 : * Configuration options
120 : */
121 : int Unix_socket_permissions;
122 : char *Unix_socket_group;
123 :
124 : /* Where the Unix socket files are (list of palloc'd strings) */
125 : static List *sock_paths = NIL;
126 :
127 : /*
128 : * Buffers for low-level I/O.
129 : *
130 : * The receive buffer is fixed size. Send buffer is usually 8k, but can be
131 : * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
132 : */
133 :
134 : #define PQ_SEND_BUFFER_SIZE 8192
135 : #define PQ_RECV_BUFFER_SIZE 8192
136 :
137 : static char *PqSendBuffer;
138 : static int PqSendBufferSize; /* Size send buffer */
139 : static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
140 : static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
141 :
142 : static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
143 : static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
144 : static int PqRecvLength; /* End of data available in PqRecvBuffer */
145 :
146 : /*
147 : * Message status
148 : */
149 : static bool PqCommBusy; /* busy sending data to the client */
150 : static bool PqCommReadingMsg; /* in the middle of reading a message */
151 : static bool DoingCopyOut; /* in old-protocol COPY OUT processing */
152 :
153 :
154 : /* Internal functions */
155 : static void socket_comm_reset(void);
156 : static void socket_close(int code, Datum arg);
157 : static void socket_set_nonblocking(bool nonblocking);
158 : static int socket_flush(void);
159 : static int socket_flush_if_writable(void);
160 : static bool socket_is_send_pending(void);
161 : static int socket_putmessage(char msgtype, const char *s, size_t len);
162 : static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
163 : static void socket_startcopyout(void);
164 : static void socket_endcopyout(bool errorAbort);
165 : static int internal_putbytes(const char *s, size_t len);
166 : static int internal_flush(void);
167 :
168 : #ifdef HAVE_UNIX_SOCKETS
169 : static int Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
170 : static int Setup_AF_UNIX(char *sock_path);
171 : #endif /* HAVE_UNIX_SOCKETS */
172 :
173 : static PQcommMethods PqCommSocketMethods = {
174 : socket_comm_reset,
175 : socket_flush,
176 : socket_flush_if_writable,
177 : socket_is_send_pending,
178 : socket_putmessage,
179 : socket_putmessage_noblock,
180 : socket_startcopyout,
181 : socket_endcopyout
182 : };
183 :
184 : PQcommMethods *PqCommMethods = &PqCommSocketMethods;
185 :
186 : WaitEventSet *FeBeWaitSet;
187 :
188 :
189 : /* --------------------------------
190 : * pq_init - initialize libpq at backend startup
191 : * --------------------------------
192 : */
193 : void
194 216 : pq_init(void)
195 : {
196 : /* initialize state variables */
197 216 : PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
198 216 : PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
199 216 : PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
200 216 : PqCommBusy = false;
201 216 : PqCommReadingMsg = false;
202 216 : DoingCopyOut = false;
203 :
204 : /* set up process-exit hook to close the socket */
205 216 : on_proc_exit(socket_close, 0);
206 :
207 : /*
208 : * In backends (as soon as forked) we operate the underlying socket in
209 : * nonblocking mode and use latches to implement blocking semantics if
210 : * needed. That allows us to provide safely interruptible reads and
211 : * writes.
212 : *
213 : * Use COMMERROR on failure, because ERROR would try to send the error to
214 : * the client, which might require changing the mode again, leading to
215 : * infinite recursion.
216 : */
217 : #ifndef WIN32
218 216 : if (!pg_set_noblock(MyProcPort->sock))
219 0 : ereport(COMMERROR,
220 : (errmsg("could not set socket to nonblocking mode: %m")));
221 : #endif
222 :
223 216 : FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
224 216 : AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
225 : NULL, NULL);
226 216 : AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
227 216 : AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
228 216 : }
229 :
230 : /* --------------------------------
231 : * socket_comm_reset - reset libpq during error recovery
232 : *
233 : * This is called from error recovery at the outer idle loop. It's
234 : * just to get us out of trouble if we somehow manage to elog() from
235 : * inside a pqcomm.c routine (which ideally will never happen, but...)
236 : * --------------------------------
237 : */
238 : static void
239 3249 : socket_comm_reset(void)
240 : {
241 : /* Do not throw away pending data, but do reset the busy flag */
242 3249 : PqCommBusy = false;
243 : /* We can abort any old-style COPY OUT, too */
244 3249 : pq_endcopyout(true);
245 3249 : }
246 :
247 : /* --------------------------------
248 : * socket_close - shutdown libpq at backend exit
249 : *
250 : * This is the one pg_on_exit_callback in place during BackendInitialize().
251 : * That function's unusual signal handling constrains that this callback be
252 : * safe to run at any instant.
253 : * --------------------------------
254 : */
255 : static void
256 216 : socket_close(int code, Datum arg)
257 : {
258 : /* Nothing to do in a standalone backend, where MyProcPort is NULL. */
259 216 : if (MyProcPort != NULL)
260 : {
261 : #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
262 : #ifdef ENABLE_GSS
263 : OM_uint32 min_s;
264 :
265 : /*
266 : * Shutdown GSSAPI layer. This section does nothing when interrupting
267 : * BackendInitialize(), because pg_GSS_recvauth() makes first use of
268 : * "ctx" and "cred".
269 : */
270 : if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
271 : gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
272 :
273 : if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
274 : gss_release_cred(&min_s, &MyProcPort->gss->cred);
275 : #endif /* ENABLE_GSS */
276 :
277 : /*
278 : * GSS and SSPI share the port->gss struct. Since nowhere else does a
279 : * postmaster child free this, doing so is safe when interrupting
280 : * BackendInitialize().
281 : */
282 : free(MyProcPort->gss);
283 : #endif /* ENABLE_GSS || ENABLE_SSPI */
284 :
285 : /*
286 : * Cleanly shut down SSL layer. Nowhere else does a postmaster child
287 : * call this, so this is safe when interrupting BackendInitialize().
288 : */
289 216 : secure_close(MyProcPort);
290 :
291 : /*
292 : * Formerly we did an explicit close() here, but it seems better to
293 : * leave the socket open until the process dies. This allows clients
294 : * to perform a "synchronous close" if they care --- wait till the
295 : * transport layer reports connection closure, and you can be sure the
296 : * backend has exited.
297 : *
298 : * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
299 : * though.
300 : */
301 216 : MyProcPort->sock = PGINVALID_SOCKET;
302 : }
303 216 : }
304 :
305 :
306 :
307 : /*
308 : * Streams -- wrapper around Unix socket system calls
309 : *
310 : *
311 : * Stream functions are used for vanilla TCP connection protocol.
312 : */
313 :
314 :
315 : /*
316 : * StreamServerPort -- open a "listening" port to accept connections.
317 : *
318 : * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
319 : * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
320 : * specified. For TCP ports, hostName is either NULL for all interfaces or
321 : * the interface to listen on, and unixSocketDir is ignored (can be NULL).
322 : *
323 : * Successfully opened sockets are added to the ListenSocket[] array (of
324 : * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
325 : *
326 : * RETURNS: STATUS_OK or STATUS_ERROR
327 : */
328 :
329 : int
330 1 : StreamServerPort(int family, char *hostName, unsigned short portNumber,
331 : char *unixSocketDir,
332 : pgsocket ListenSocket[], int MaxListen)
333 : {
334 : pgsocket fd;
335 : int err;
336 : int maxconn;
337 : int ret;
338 : char portNumberStr[32];
339 : const char *familyDesc;
340 : char familyDescBuf[64];
341 : const char *addrDesc;
342 : char addrBuf[NI_MAXHOST];
343 : char *service;
344 1 : struct addrinfo *addrs = NULL,
345 : *addr;
346 : struct addrinfo hint;
347 1 : int listen_index = 0;
348 1 : int added = 0;
349 :
350 : #ifdef HAVE_UNIX_SOCKETS
351 : char unixSocketPath[MAXPGPATH];
352 : #endif
353 : #if !defined(WIN32) || defined(IPV6_V6ONLY)
354 1 : int one = 1;
355 : #endif
356 :
357 : /* Initialize hint structure */
358 1 : MemSet(&hint, 0, sizeof(hint));
359 1 : hint.ai_family = family;
360 1 : hint.ai_flags = AI_PASSIVE;
361 1 : hint.ai_socktype = SOCK_STREAM;
362 :
363 : #ifdef HAVE_UNIX_SOCKETS
364 1 : if (family == AF_UNIX)
365 : {
366 : /*
367 : * Create unixSocketPath from portNumber and unixSocketDir and lock
368 : * that file path
369 : */
370 1 : UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
371 1 : if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
372 : {
373 0 : ereport(LOG,
374 : (errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
375 : unixSocketPath,
376 : (int) (UNIXSOCK_PATH_BUFLEN - 1))));
377 0 : return STATUS_ERROR;
378 : }
379 1 : if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
380 0 : return STATUS_ERROR;
381 1 : service = unixSocketPath;
382 : }
383 : else
384 : #endif /* HAVE_UNIX_SOCKETS */
385 : {
386 0 : snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
387 0 : service = portNumberStr;
388 : }
389 :
390 1 : ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
391 1 : if (ret || !addrs)
392 : {
393 0 : if (hostName)
394 0 : ereport(LOG,
395 : (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
396 : hostName, service, gai_strerror(ret))));
397 : else
398 0 : ereport(LOG,
399 : (errmsg("could not translate service \"%s\" to address: %s",
400 : service, gai_strerror(ret))));
401 0 : if (addrs)
402 0 : pg_freeaddrinfo_all(hint.ai_family, addrs);
403 0 : return STATUS_ERROR;
404 : }
405 :
406 2 : for (addr = addrs; addr; addr = addr->ai_next)
407 : {
408 1 : if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
409 : {
410 : /*
411 : * Only set up a unix domain socket when they really asked for it.
412 : * The service/port is different in that case.
413 : */
414 0 : continue;
415 : }
416 :
417 : /* See if there is still room to add 1 more socket. */
418 1 : for (; listen_index < MaxListen; listen_index++)
419 : {
420 1 : if (ListenSocket[listen_index] == PGINVALID_SOCKET)
421 1 : break;
422 : }
423 1 : if (listen_index >= MaxListen)
424 : {
425 0 : ereport(LOG,
426 : (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
427 : MaxListen)));
428 0 : break;
429 : }
430 :
431 : /* set up address family name for log messages */
432 1 : switch (addr->ai_family)
433 : {
434 : case AF_INET:
435 0 : familyDesc = _("IPv4");
436 0 : break;
437 : #ifdef HAVE_IPV6
438 : case AF_INET6:
439 0 : familyDesc = _("IPv6");
440 0 : break;
441 : #endif
442 : #ifdef HAVE_UNIX_SOCKETS
443 : case AF_UNIX:
444 1 : familyDesc = _("Unix");
445 1 : break;
446 : #endif
447 : default:
448 0 : snprintf(familyDescBuf, sizeof(familyDescBuf),
449 : _("unrecognized address family %d"),
450 : addr->ai_family);
451 0 : familyDesc = familyDescBuf;
452 0 : break;
453 : }
454 :
455 : /* set up text form of address for log messages */
456 : #ifdef HAVE_UNIX_SOCKETS
457 1 : if (addr->ai_family == AF_UNIX)
458 1 : addrDesc = unixSocketPath;
459 : else
460 : #endif
461 : {
462 0 : pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
463 0 : addr->ai_addrlen,
464 : addrBuf, sizeof(addrBuf),
465 : NULL, 0,
466 : NI_NUMERICHOST);
467 0 : addrDesc = addrBuf;
468 : }
469 :
470 1 : if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
471 : {
472 0 : ereport(LOG,
473 : (errcode_for_socket_access(),
474 : /* translator: first %s is IPv4, IPv6, or Unix */
475 : errmsg("could not create %s socket for address \"%s\": %m",
476 : familyDesc, addrDesc)));
477 0 : continue;
478 : }
479 :
480 : #ifndef WIN32
481 :
482 : /*
483 : * Without the SO_REUSEADDR flag, a new postmaster can't be started
484 : * right away after a stop or crash, giving "address already in use"
485 : * error on TCP ports.
486 : *
487 : * On win32, however, this behavior only happens if the
488 : * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
489 : * servers to listen on the same address, resulting in unpredictable
490 : * behavior. With no flags at all, win32 behaves as Unix with
491 : * SO_REUSEADDR.
492 : */
493 1 : if (!IS_AF_UNIX(addr->ai_family))
494 : {
495 0 : if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
496 : (char *) &one, sizeof(one))) == -1)
497 : {
498 0 : ereport(LOG,
499 : (errcode_for_socket_access(),
500 : /* translator: first %s is IPv4, IPv6, or Unix */
501 : errmsg("setsockopt(SO_REUSEADDR) failed for %s address \"%s\": %m",
502 : familyDesc, addrDesc)));
503 0 : closesocket(fd);
504 0 : continue;
505 : }
506 : }
507 : #endif
508 :
509 : #ifdef IPV6_V6ONLY
510 1 : if (addr->ai_family == AF_INET6)
511 : {
512 0 : if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
513 : (char *) &one, sizeof(one)) == -1)
514 : {
515 0 : ereport(LOG,
516 : (errcode_for_socket_access(),
517 : /* translator: first %s is IPv4, IPv6, or Unix */
518 : errmsg("setsockopt(IPV6_V6ONLY) failed for %s address \"%s\": %m",
519 : familyDesc, addrDesc)));
520 0 : closesocket(fd);
521 0 : continue;
522 : }
523 : }
524 : #endif
525 :
526 : /*
527 : * Note: This might fail on some OS's, like Linux older than
528 : * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
529 : * ipv4 addresses to ipv6. It will show ::ffff:ipv4 for all ipv4
530 : * connections.
531 : */
532 1 : err = bind(fd, addr->ai_addr, addr->ai_addrlen);
533 1 : if (err < 0)
534 : {
535 0 : ereport(LOG,
536 : (errcode_for_socket_access(),
537 : /* translator: first %s is IPv4, IPv6, or Unix */
538 : errmsg("could not bind %s address \"%s\": %m",
539 : familyDesc, addrDesc),
540 : (IS_AF_UNIX(addr->ai_family)) ?
541 : errhint("Is another postmaster already running on port %d?"
542 : " If not, remove socket file \"%s\" and retry.",
543 : (int) portNumber, service) :
544 : errhint("Is another postmaster already running on port %d?"
545 : " If not, wait a few seconds and retry.",
546 : (int) portNumber)));
547 0 : closesocket(fd);
548 0 : continue;
549 : }
550 :
551 : #ifdef HAVE_UNIX_SOCKETS
552 1 : if (addr->ai_family == AF_UNIX)
553 : {
554 1 : if (Setup_AF_UNIX(service) != STATUS_OK)
555 : {
556 0 : closesocket(fd);
557 0 : break;
558 : }
559 : }
560 : #endif
561 :
562 : /*
563 : * Select appropriate accept-queue length limit. PG_SOMAXCONN is only
564 : * intended to provide a clamp on the request on platforms where an
565 : * overly large request provokes a kernel error (are there any?).
566 : */
567 1 : maxconn = MaxBackends * 2;
568 1 : if (maxconn > PG_SOMAXCONN)
569 0 : maxconn = PG_SOMAXCONN;
570 :
571 1 : err = listen(fd, maxconn);
572 1 : if (err < 0)
573 : {
574 0 : ereport(LOG,
575 : (errcode_for_socket_access(),
576 : /* translator: first %s is IPv4, IPv6, or Unix */
577 : errmsg("could not listen on %s address \"%s\": %m",
578 : familyDesc, addrDesc)));
579 0 : closesocket(fd);
580 0 : continue;
581 : }
582 :
583 : #ifdef HAVE_UNIX_SOCKETS
584 1 : if (addr->ai_family == AF_UNIX)
585 1 : ereport(LOG,
586 : (errmsg("listening on Unix socket \"%s\"",
587 : addrDesc)));
588 : else
589 : #endif
590 0 : ereport(LOG,
591 : /* translator: first %s is IPv4 or IPv6 */
592 : (errmsg("listening on %s address \"%s\", port %d",
593 : familyDesc, addrDesc, (int) portNumber)));
594 :
595 1 : ListenSocket[listen_index] = fd;
596 1 : added++;
597 : }
598 :
599 1 : pg_freeaddrinfo_all(hint.ai_family, addrs);
600 :
601 1 : if (!added)
602 0 : return STATUS_ERROR;
603 :
604 1 : return STATUS_OK;
605 : }
606 :
607 :
608 : #ifdef HAVE_UNIX_SOCKETS
609 :
610 : /*
611 : * Lock_AF_UNIX -- configure unix socket file path
612 : */
613 : static int
614 1 : Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
615 : {
616 : /*
617 : * Grab an interlock file associated with the socket file.
618 : *
619 : * Note: there are two reasons for using a socket lock file, rather than
620 : * trying to interlock directly on the socket itself. First, it's a lot
621 : * more portable, and second, it lets us remove any pre-existing socket
622 : * file without race conditions.
623 : */
624 1 : CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
625 :
626 : /*
627 : * Once we have the interlock, we can safely delete any pre-existing
628 : * socket file to avoid failure at bind() time.
629 : */
630 1 : (void) unlink(unixSocketPath);
631 :
632 : /*
633 : * Remember socket file pathnames for later maintenance.
634 : */
635 1 : sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
636 :
637 1 : return STATUS_OK;
638 : }
639 :
640 :
641 : /*
642 : * Setup_AF_UNIX -- configure unix socket permissions
643 : */
644 : static int
645 1 : Setup_AF_UNIX(char *sock_path)
646 : {
647 : /*
648 : * Fix socket ownership/permission if requested. Note we must do this
649 : * before we listen() to avoid a window where unwanted connections could
650 : * get accepted.
651 : */
652 1 : Assert(Unix_socket_group);
653 1 : if (Unix_socket_group[0] != '\0')
654 : {
655 : #ifdef WIN32
656 : elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
657 : #else
658 : char *endptr;
659 : unsigned long val;
660 : gid_t gid;
661 :
662 0 : val = strtoul(Unix_socket_group, &endptr, 10);
663 0 : if (*endptr == '\0')
664 : { /* numeric group id */
665 0 : gid = val;
666 : }
667 : else
668 : { /* convert group name to id */
669 : struct group *gr;
670 :
671 0 : gr = getgrnam(Unix_socket_group);
672 0 : if (!gr)
673 : {
674 0 : ereport(LOG,
675 : (errmsg("group \"%s\" does not exist",
676 : Unix_socket_group)));
677 0 : return STATUS_ERROR;
678 : }
679 0 : gid = gr->gr_gid;
680 : }
681 0 : if (chown(sock_path, -1, gid) == -1)
682 : {
683 0 : ereport(LOG,
684 : (errcode_for_file_access(),
685 : errmsg("could not set group of file \"%s\": %m",
686 : sock_path)));
687 0 : return STATUS_ERROR;
688 : }
689 : #endif
690 : }
691 :
692 1 : if (chmod(sock_path, Unix_socket_permissions) == -1)
693 : {
694 0 : ereport(LOG,
695 : (errcode_for_file_access(),
696 : errmsg("could not set permissions of file \"%s\": %m",
697 : sock_path)));
698 0 : return STATUS_ERROR;
699 : }
700 1 : return STATUS_OK;
701 : }
702 : #endif /* HAVE_UNIX_SOCKETS */
703 :
704 :
705 : /*
706 : * StreamConnection -- create a new connection with client using
707 : * server port. Set port->sock to the FD of the new connection.
708 : *
709 : * ASSUME: that this doesn't need to be non-blocking because
710 : * the Postmaster uses select() to tell when the server master
711 : * socket is ready for accept().
712 : *
713 : * RETURNS: STATUS_OK or STATUS_ERROR
714 : */
715 : int
716 216 : StreamConnection(pgsocket server_fd, Port *port)
717 : {
718 : /* accept connection and fill in the client (remote) address */
719 216 : port->raddr.salen = sizeof(port->raddr.addr);
720 216 : if ((port->sock = accept(server_fd,
721 216 : (struct sockaddr *) &port->raddr.addr,
722 216 : &port->raddr.salen)) == PGINVALID_SOCKET)
723 : {
724 0 : ereport(LOG,
725 : (errcode_for_socket_access(),
726 : errmsg("could not accept new connection: %m")));
727 :
728 : /*
729 : * If accept() fails then postmaster.c will still see the server
730 : * socket as read-ready, and will immediately try again. To avoid
731 : * uselessly sucking lots of CPU, delay a bit before trying again.
732 : * (The most likely reason for failure is being out of kernel file
733 : * table slots; we can do little except hope some will get freed up.)
734 : */
735 0 : pg_usleep(100000L); /* wait 0.1 sec */
736 0 : return STATUS_ERROR;
737 : }
738 :
739 : /* fill in the server (local) address */
740 216 : port->laddr.salen = sizeof(port->laddr.addr);
741 216 : if (getsockname(port->sock,
742 216 : (struct sockaddr *) &port->laddr.addr,
743 216 : &port->laddr.salen) < 0)
744 : {
745 0 : elog(LOG, "getsockname() failed: %m");
746 0 : return STATUS_ERROR;
747 : }
748 :
749 : /* select NODELAY and KEEPALIVE options if it's a TCP connection */
750 216 : if (!IS_AF_UNIX(port->laddr.addr.ss_family))
751 : {
752 : int on;
753 : #ifdef WIN32
754 : int oldopt;
755 : int optlen;
756 : int newopt;
757 : #endif
758 :
759 : #ifdef TCP_NODELAY
760 0 : on = 1;
761 0 : if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
762 : (char *) &on, sizeof(on)) < 0)
763 : {
764 0 : elog(LOG, "setsockopt(%s) failed: %m", "TCP_NODELAY");
765 0 : return STATUS_ERROR;
766 : }
767 : #endif
768 0 : on = 1;
769 0 : if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
770 : (char *) &on, sizeof(on)) < 0)
771 : {
772 0 : elog(LOG, "setsockopt(%s) failed: %m", "SO_KEEPALIVE");
773 0 : return STATUS_ERROR;
774 : }
775 :
776 : #ifdef WIN32
777 :
778 : /*
779 : * This is a Win32 socket optimization. The OS send buffer should be
780 : * large enough to send the whole Postgres send buffer in one go, or
781 : * performance suffers. The Postgres send buffer can be enlarged if a
782 : * very large message needs to be sent, but we won't attempt to
783 : * enlarge the OS buffer if that happens, so somewhat arbitrarily
784 : * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
785 : * (That's 32kB with the current default).
786 : *
787 : * The default OS buffer size used to be 8kB in earlier Windows
788 : * versions, but was raised to 64kB in Windows 2012. So it shouldn't
789 : * be necessary to change it in later versions anymore. Changing it
790 : * unnecessarily can even reduce performance, because setting
791 : * SO_SNDBUF in the application disables the "dynamic send buffering"
792 : * feature that was introduced in Windows 7. So before fiddling with
793 : * SO_SNDBUF, check if the current buffer size is already large enough
794 : * and only increase it if necessary.
795 : *
796 : * See https://support.microsoft.com/kb/823764/EN-US/ and
797 : * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
798 : */
799 : optlen = sizeof(oldopt);
800 : if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
801 : &optlen) < 0)
802 : {
803 : elog(LOG, "getsockopt(%s) failed: %m", "SO_SNDBUF");
804 : return STATUS_ERROR;
805 : }
806 : newopt = PQ_SEND_BUFFER_SIZE * 4;
807 : if (oldopt < newopt)
808 : {
809 : if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
810 : sizeof(newopt)) < 0)
811 : {
812 : elog(LOG, "setsockopt(%s) failed: %m", "SO_SNDBUF");
813 : return STATUS_ERROR;
814 : }
815 : }
816 : #endif
817 :
818 : /*
819 : * Also apply the current keepalive parameters. If we fail to set a
820 : * parameter, don't error out, because these aren't universally
821 : * supported. (Note: you might think we need to reset the GUC
822 : * variables to 0 in such a case, but it's not necessary because the
823 : * show hooks for these variables report the truth anyway.)
824 : */
825 0 : (void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
826 0 : (void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
827 0 : (void) pq_setkeepalivescount(tcp_keepalives_count, port);
828 : }
829 :
830 216 : return STATUS_OK;
831 : }
832 :
833 : /*
834 : * StreamClose -- close a client/backend connection
835 : *
836 : * NOTE: this is NOT used to terminate a session; it is just used to release
837 : * the file descriptor in a process that should no longer have the socket
838 : * open. (For example, the postmaster calls this after passing ownership
839 : * of the connection to a child process.) It is expected that someone else
840 : * still has the socket open. So, we only want to close the descriptor,
841 : * we do NOT want to send anything to the far end.
842 : */
843 : void
844 558 : StreamClose(pgsocket sock)
845 : {
846 558 : closesocket(sock);
847 558 : }
848 :
849 : /*
850 : * TouchSocketFiles -- mark socket files as recently accessed
851 : *
852 : * This routine should be called every so often to ensure that the socket
853 : * files have a recent mod date (ordinary operations on sockets usually won't
854 : * change the mod date). That saves them from being removed by
855 : * overenthusiastic /tmp-directory-cleaner daemons. (Another reason we should
856 : * never have put the socket file in /tmp...)
857 : */
858 : void
859 0 : TouchSocketFiles(void)
860 : {
861 : ListCell *l;
862 :
863 : /* Loop through all created sockets... */
864 0 : foreach(l, sock_paths)
865 : {
866 0 : char *sock_path = (char *) lfirst(l);
867 :
868 : /*
869 : * utime() is POSIX standard, utimes() is a common alternative. If we
870 : * have neither, there's no way to affect the mod or access time of
871 : * the socket :-(
872 : *
873 : * In either path, we ignore errors; there's no point in complaining.
874 : */
875 : #ifdef HAVE_UTIME
876 0 : utime(sock_path, NULL);
877 : #else /* !HAVE_UTIME */
878 : #ifdef HAVE_UTIMES
879 : utimes(sock_path, NULL);
880 : #endif /* HAVE_UTIMES */
881 : #endif /* HAVE_UTIME */
882 : }
883 0 : }
884 :
885 : /*
886 : * RemoveSocketFiles -- unlink socket files at postmaster shutdown
887 : */
888 : void
889 1 : RemoveSocketFiles(void)
890 : {
891 : ListCell *l;
892 :
893 : /* Loop through all created sockets... */
894 2 : foreach(l, sock_paths)
895 : {
896 1 : char *sock_path = (char *) lfirst(l);
897 :
898 : /* Ignore any error. */
899 1 : (void) unlink(sock_path);
900 : }
901 : /* Since we're about to exit, no need to reclaim storage */
902 1 : sock_paths = NIL;
903 1 : }
904 :
905 :
906 : /* --------------------------------
907 : * Low-level I/O routines begin here.
908 : *
909 : * These routines communicate with a frontend client across a connection
910 : * already established by the preceding routines.
911 : * --------------------------------
912 : */
913 :
914 : /* --------------------------------
915 : * socket_set_nonblocking - set socket blocking/non-blocking
916 : *
917 : * Sets the socket non-blocking if nonblocking is TRUE, or sets it
918 : * blocking otherwise.
919 : * --------------------------------
920 : */
921 : static void
922 59691 : socket_set_nonblocking(bool nonblocking)
923 : {
924 59691 : if (MyProcPort == NULL)
925 0 : ereport(ERROR,
926 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
927 : errmsg("there is no client connection")));
928 :
929 59691 : MyProcPort->noblock = nonblocking;
930 59691 : }
931 :
932 : /* --------------------------------
933 : * pq_recvbuf - load some bytes into the input buffer
934 : *
935 : * returns 0 if OK, EOF if trouble
936 : * --------------------------------
937 : */
938 : static int
939 27472 : pq_recvbuf(void)
940 : {
941 27472 : if (PqRecvPointer > 0)
942 : {
943 27256 : if (PqRecvLength > PqRecvPointer)
944 : {
945 : /* still some unread data, left-justify it in the buffer */
946 0 : memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
947 0 : PqRecvLength - PqRecvPointer);
948 0 : PqRecvLength -= PqRecvPointer;
949 0 : PqRecvPointer = 0;
950 : }
951 : else
952 27256 : PqRecvLength = PqRecvPointer = 0;
953 : }
954 :
955 : /* Ensure that we're in blocking mode */
956 27472 : socket_set_nonblocking(false);
957 :
958 : /* Can fill buffer from PqRecvLength and upwards */
959 : for (;;)
960 : {
961 : int r;
962 :
963 27472 : r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
964 27472 : PQ_RECV_BUFFER_SIZE - PqRecvLength);
965 :
966 27472 : if (r < 0)
967 : {
968 0 : if (errno == EINTR)
969 0 : continue; /* Ok if interrupted */
970 :
971 : /*
972 : * Careful: an ereport() that tries to write to the client would
973 : * cause recursion to here, leading to stack overflow and core
974 : * dump! This message must go *only* to the postmaster log.
975 : */
976 0 : ereport(COMMERROR,
977 : (errcode_for_socket_access(),
978 : errmsg("could not receive data from client: %m")));
979 0 : return EOF;
980 : }
981 27472 : if (r == 0)
982 : {
983 : /*
984 : * EOF detected. We used to write a log message here, but it's
985 : * better to expect the ultimate caller to do that.
986 : */
987 0 : return EOF;
988 : }
989 : /* r contains number of bytes read, so just incr length */
990 27472 : PqRecvLength += r;
991 27472 : return 0;
992 0 : }
993 : }
994 :
995 : /* --------------------------------
996 : * pq_getbyte - get a single byte from connection, or return EOF
997 : * --------------------------------
998 : */
999 : int
1000 27317 : pq_getbyte(void)
1001 : {
1002 27317 : Assert(PqCommReadingMsg);
1003 :
1004 81728 : while (PqRecvPointer >= PqRecvLength)
1005 : {
1006 27094 : if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1007 0 : return EOF; /* Failed to recv data */
1008 : }
1009 27317 : return (unsigned char) PqRecvBuffer[PqRecvPointer++];
1010 : }
1011 :
1012 : /* --------------------------------
1013 : * pq_peekbyte - peek at next byte from connection
1014 : *
1015 : * Same as pq_getbyte() except we don't advance the pointer.
1016 : * --------------------------------
1017 : */
1018 : int
1019 0 : pq_peekbyte(void)
1020 : {
1021 0 : Assert(PqCommReadingMsg);
1022 :
1023 0 : while (PqRecvPointer >= PqRecvLength)
1024 : {
1025 0 : if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1026 0 : return EOF; /* Failed to recv data */
1027 : }
1028 0 : return (unsigned char) PqRecvBuffer[PqRecvPointer];
1029 : }
1030 :
1031 : /* --------------------------------
1032 : * pq_getbyte_if_available - get a single byte from connection,
1033 : * if available
1034 : *
1035 : * The received byte is stored in *c. Returns 1 if a byte was read,
1036 : * 0 if no data was available, or EOF if trouble.
1037 : * --------------------------------
1038 : */
1039 : int
1040 0 : pq_getbyte_if_available(unsigned char *c)
1041 : {
1042 : int r;
1043 :
1044 0 : Assert(PqCommReadingMsg);
1045 :
1046 0 : if (PqRecvPointer < PqRecvLength)
1047 : {
1048 0 : *c = PqRecvBuffer[PqRecvPointer++];
1049 0 : return 1;
1050 : }
1051 :
1052 : /* Put the socket into non-blocking mode */
1053 0 : socket_set_nonblocking(true);
1054 :
1055 0 : r = secure_read(MyProcPort, c, 1);
1056 0 : if (r < 0)
1057 : {
1058 : /*
1059 : * Ok if no data available without blocking or interrupted (though
1060 : * EINTR really shouldn't happen with a non-blocking socket). Report
1061 : * other errors.
1062 : */
1063 0 : if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1064 0 : r = 0;
1065 : else
1066 : {
1067 : /*
1068 : * Careful: an ereport() that tries to write to the client would
1069 : * cause recursion to here, leading to stack overflow and core
1070 : * dump! This message must go *only* to the postmaster log.
1071 : */
1072 0 : ereport(COMMERROR,
1073 : (errcode_for_socket_access(),
1074 : errmsg("could not receive data from client: %m")));
1075 0 : r = EOF;
1076 : }
1077 : }
1078 0 : else if (r == 0)
1079 : {
1080 : /* EOF detected */
1081 0 : r = EOF;
1082 : }
1083 :
1084 0 : return r;
1085 : }
1086 :
1087 : /* --------------------------------
1088 : * pq_getbytes - get a known number of bytes from connection
1089 : *
1090 : * returns 0 if OK, EOF if trouble
1091 : * --------------------------------
1092 : */
1093 : int
1094 54773 : pq_getbytes(char *s, size_t len)
1095 : {
1096 : size_t amount;
1097 :
1098 54773 : Assert(PqCommReadingMsg);
1099 :
1100 164481 : while (len > 0)
1101 : {
1102 110248 : while (PqRecvPointer >= PqRecvLength)
1103 : {
1104 378 : if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1105 0 : return EOF; /* Failed to recv data */
1106 : }
1107 54935 : amount = PqRecvLength - PqRecvPointer;
1108 54935 : if (amount > len)
1109 27463 : amount = len;
1110 54935 : memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1111 54935 : PqRecvPointer += amount;
1112 54935 : s += amount;
1113 54935 : len -= amount;
1114 : }
1115 54773 : return 0;
1116 : }
1117 :
1118 : /* --------------------------------
1119 : * pq_discardbytes - throw away a known number of bytes
1120 : *
1121 : * same as pq_getbytes except we do not copy the data to anyplace.
1122 : * this is used for resynchronizing after read errors.
1123 : *
1124 : * returns 0 if OK, EOF if trouble
1125 : * --------------------------------
1126 : */
1127 : static int
1128 0 : pq_discardbytes(size_t len)
1129 : {
1130 : size_t amount;
1131 :
1132 0 : Assert(PqCommReadingMsg);
1133 :
1134 0 : while (len > 0)
1135 : {
1136 0 : while (PqRecvPointer >= PqRecvLength)
1137 : {
1138 0 : if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1139 0 : return EOF; /* Failed to recv data */
1140 : }
1141 0 : amount = PqRecvLength - PqRecvPointer;
1142 0 : if (amount > len)
1143 0 : amount = len;
1144 0 : PqRecvPointer += amount;
1145 0 : len -= amount;
1146 : }
1147 0 : return 0;
1148 : }
1149 :
1150 : /* --------------------------------
1151 : * pq_getstring - get a null terminated string from connection
1152 : *
1153 : * The return value is placed in an expansible StringInfo, which has
1154 : * already been initialized by the caller.
1155 : *
1156 : * This is used only for dealing with old-protocol clients. The idea
1157 : * is to produce a StringInfo that looks the same as we would get from
1158 : * pq_getmessage() with a newer client; we will then process it with
1159 : * pq_getmsgstring. Therefore, no character set conversion is done here,
1160 : * even though this is presumably useful only for text.
1161 : *
1162 : * returns 0 if OK, EOF if trouble
1163 : * --------------------------------
1164 : */
1165 : int
1166 0 : pq_getstring(StringInfo s)
1167 : {
1168 : int i;
1169 :
1170 0 : Assert(PqCommReadingMsg);
1171 :
1172 0 : resetStringInfo(s);
1173 :
1174 : /* Read until we get the terminating '\0' */
1175 : for (;;)
1176 : {
1177 0 : while (PqRecvPointer >= PqRecvLength)
1178 : {
1179 0 : if (pq_recvbuf()) /* If nothing in buffer, then recv some */
1180 0 : return EOF; /* Failed to recv data */
1181 : }
1182 :
1183 0 : for (i = PqRecvPointer; i < PqRecvLength; i++)
1184 : {
1185 0 : if (PqRecvBuffer[i] == '\0')
1186 : {
1187 : /* include the '\0' in the copy */
1188 0 : appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1189 0 : i - PqRecvPointer + 1);
1190 0 : PqRecvPointer = i + 1; /* advance past \0 */
1191 0 : return 0;
1192 : }
1193 : }
1194 :
1195 : /* If we're here we haven't got the \0 in the buffer yet. */
1196 0 : appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1197 : PqRecvLength - PqRecvPointer);
1198 0 : PqRecvPointer = PqRecvLength;
1199 0 : }
1200 : }
1201 :
1202 :
1203 : /* --------------------------------
1204 : * pq_startmsgread - begin reading a message from the client.
1205 : *
1206 : * This must be called before any of the pq_get* functions.
1207 : * --------------------------------
1208 : */
1209 : void
1210 27533 : pq_startmsgread(void)
1211 : {
1212 : /*
1213 : * There shouldn't be a read active already, but let's check just to be
1214 : * sure.
1215 : */
1216 27533 : if (PqCommReadingMsg)
1217 0 : ereport(FATAL,
1218 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1219 : errmsg("terminating connection because protocol synchronization was lost")));
1220 :
1221 27533 : PqCommReadingMsg = true;
1222 27533 : }
1223 :
1224 :
1225 : /* --------------------------------
1226 : * pq_endmsgread - finish reading message.
1227 : *
1228 : * This must be called after reading a V2 protocol message with
1229 : * pq_getstring() and friends, to indicate that we have read the whole
1230 : * message. In V3 protocol, pq_getmessage() does this implicitly.
1231 : * --------------------------------
1232 : */
1233 : void
1234 216 : pq_endmsgread(void)
1235 : {
1236 216 : Assert(PqCommReadingMsg);
1237 :
1238 216 : PqCommReadingMsg = false;
1239 216 : }
1240 :
1241 : /* --------------------------------
1242 : * pq_is_reading_msg - are we currently reading a message?
1243 : *
1244 : * This is used in error recovery at the outer idle loop to detect if we have
1245 : * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1246 : * will check for that too, but it's nicer to detect it earlier.
1247 : * --------------------------------
1248 : */
1249 : bool
1250 3249 : pq_is_reading_msg(void)
1251 : {
1252 3249 : return PqCommReadingMsg;
1253 : }
1254 :
1255 : /* --------------------------------
1256 : * pq_getmessage - get a message with length word from connection
1257 : *
1258 : * The return value is placed in an expansible StringInfo, which has
1259 : * already been initialized by the caller.
1260 : * Only the message body is placed in the StringInfo; the length word
1261 : * is removed. Also, s->cursor is initialized to zero for convenience
1262 : * in scanning the message contents.
1263 : *
1264 : * If maxlen is not zero, it is an upper limit on the length of the
1265 : * message we are willing to accept. We abort the connection (by
1266 : * returning EOF) if client tries to send more than that.
1267 : *
1268 : * returns 0 if OK, EOF if trouble
1269 : * --------------------------------
1270 : */
1271 : int
1272 27317 : pq_getmessage(StringInfo s, int maxlen)
1273 : {
1274 : int32 len;
1275 :
1276 27317 : Assert(PqCommReadingMsg);
1277 :
1278 27317 : resetStringInfo(s);
1279 :
1280 : /* Read message length word */
1281 27317 : if (pq_getbytes((char *) &len, 4) == EOF)
1282 : {
1283 0 : ereport(COMMERROR,
1284 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1285 : errmsg("unexpected EOF within message length word")));
1286 0 : return EOF;
1287 : }
1288 :
1289 27317 : len = ntohl(len);
1290 :
1291 27317 : if (len < 4 ||
1292 0 : (maxlen > 0 && len > maxlen))
1293 : {
1294 0 : ereport(COMMERROR,
1295 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1296 : errmsg("invalid message length")));
1297 0 : return EOF;
1298 : }
1299 :
1300 27317 : len -= 4; /* discount length itself */
1301 :
1302 27317 : if (len > 0)
1303 : {
1304 : /*
1305 : * Allocate space for message. If we run out of room (ridiculously
1306 : * large message), we will elog(ERROR), but we want to discard the
1307 : * message body so as not to lose communication sync.
1308 : */
1309 27024 : PG_TRY();
1310 : {
1311 27024 : enlargeStringInfo(s, len);
1312 : }
1313 0 : PG_CATCH();
1314 : {
1315 0 : if (pq_discardbytes(len) == EOF)
1316 0 : ereport(COMMERROR,
1317 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1318 : errmsg("incomplete message from client")));
1319 :
1320 : /* we discarded the rest of the message so we're back in sync. */
1321 0 : PqCommReadingMsg = false;
1322 0 : PG_RE_THROW();
1323 : }
1324 27024 : PG_END_TRY();
1325 :
1326 : /* And grab the message */
1327 27024 : if (pq_getbytes(s->data, len) == EOF)
1328 : {
1329 0 : ereport(COMMERROR,
1330 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1331 : errmsg("incomplete message from client")));
1332 0 : return EOF;
1333 : }
1334 27024 : s->len = len;
1335 : /* Place a trailing null per StringInfo convention */
1336 27024 : s->data[len] = '\0';
1337 : }
1338 :
1339 : /* finished reading the message. */
1340 27317 : PqCommReadingMsg = false;
1341 :
1342 27317 : return 0;
1343 : }
1344 :
1345 :
1346 : /* --------------------------------
1347 : * pq_putbytes - send bytes to connection (not flushed until pq_flush)
1348 : *
1349 : * returns 0 if OK, EOF if trouble
1350 : * --------------------------------
1351 : */
1352 : int
1353 0 : pq_putbytes(const char *s, size_t len)
1354 : {
1355 : int res;
1356 :
1357 : /* Should only be called by old-style COPY OUT */
1358 0 : Assert(DoingCopyOut);
1359 : /* No-op if reentrant call */
1360 0 : if (PqCommBusy)
1361 0 : return 0;
1362 0 : PqCommBusy = true;
1363 0 : res = internal_putbytes(s, len);
1364 0 : PqCommBusy = false;
1365 0 : return res;
1366 : }
1367 :
1368 : static int
1369 327900 : internal_putbytes(const char *s, size_t len)
1370 : {
1371 : size_t amount;
1372 :
1373 983763 : while (len > 0)
1374 : {
1375 : /* If buffer is full, then flush it out */
1376 327963 : if (PqSendPointer >= PqSendBufferSize)
1377 : {
1378 134 : socket_set_nonblocking(false);
1379 134 : if (internal_flush())
1380 0 : return EOF;
1381 : }
1382 327963 : amount = PqSendBufferSize - PqSendPointer;
1383 327963 : if (amount > len)
1384 327829 : amount = len;
1385 327963 : memcpy(PqSendBuffer + PqSendPointer, s, amount);
1386 327963 : PqSendPointer += amount;
1387 327963 : s += amount;
1388 327963 : len -= amount;
1389 : }
1390 327900 : return 0;
1391 : }
1392 :
1393 : /* --------------------------------
1394 : * socket_flush - flush pending output
1395 : *
1396 : * returns 0 if OK, EOF if trouble
1397 : * --------------------------------
1398 : */
1399 : static int
1400 32085 : socket_flush(void)
1401 : {
1402 : int res;
1403 :
1404 : /* No-op if reentrant call */
1405 32085 : if (PqCommBusy)
1406 0 : return 0;
1407 32085 : PqCommBusy = true;
1408 32085 : socket_set_nonblocking(false);
1409 32085 : res = internal_flush();
1410 32085 : PqCommBusy = false;
1411 32085 : return res;
1412 : }
1413 :
1414 : /* --------------------------------
1415 : * internal_flush - flush pending output
1416 : *
1417 : * Returns 0 if OK (meaning everything was sent, or operation would block
1418 : * and the socket is in non-blocking mode), or EOF if trouble.
1419 : * --------------------------------
1420 : */
1421 : static int
1422 32219 : internal_flush(void)
1423 : {
1424 : static int last_reported_send_errno = 0;
1425 :
1426 32219 : char *bufptr = PqSendBuffer + PqSendStart;
1427 32219 : char *bufend = PqSendBuffer + PqSendPointer;
1428 :
1429 96657 : while (bufptr < bufend)
1430 : {
1431 : int r;
1432 :
1433 32219 : r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1434 :
1435 32219 : if (r <= 0)
1436 : {
1437 0 : if (errno == EINTR)
1438 0 : continue; /* Ok if we were interrupted */
1439 :
1440 : /*
1441 : * Ok if no data writable without blocking, and the socket is in
1442 : * non-blocking mode.
1443 : */
1444 0 : if (errno == EAGAIN ||
1445 0 : errno == EWOULDBLOCK)
1446 : {
1447 0 : return 0;
1448 : }
1449 :
1450 : /*
1451 : * Careful: an ereport() that tries to write to the client would
1452 : * cause recursion to here, leading to stack overflow and core
1453 : * dump! This message must go *only* to the postmaster log.
1454 : *
1455 : * If a client disconnects while we're in the midst of output, we
1456 : * might write quite a bit of data before we get to a safe query
1457 : * abort point. So, suppress duplicate log messages.
1458 : */
1459 0 : if (errno != last_reported_send_errno)
1460 : {
1461 0 : last_reported_send_errno = errno;
1462 0 : ereport(COMMERROR,
1463 : (errcode_for_socket_access(),
1464 : errmsg("could not send data to client: %m")));
1465 : }
1466 :
1467 : /*
1468 : * We drop the buffered data anyway so that processing can
1469 : * continue, even though we'll probably quit soon. We also set a
1470 : * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1471 : * the connection.
1472 : */
1473 0 : PqSendStart = PqSendPointer = 0;
1474 0 : ClientConnectionLost = 1;
1475 0 : InterruptPending = 1;
1476 0 : return EOF;
1477 : }
1478 :
1479 32219 : last_reported_send_errno = 0; /* reset after any successful send */
1480 32219 : bufptr += r;
1481 32219 : PqSendStart += r;
1482 : }
1483 :
1484 32219 : PqSendStart = PqSendPointer = 0;
1485 32219 : return 0;
1486 : }
1487 :
1488 : /* --------------------------------
1489 : * pq_flush_if_writable - flush pending output if writable without blocking
1490 : *
1491 : * Returns 0 if OK, or EOF if trouble.
1492 : * --------------------------------
1493 : */
1494 : static int
1495 0 : socket_flush_if_writable(void)
1496 : {
1497 : int res;
1498 :
1499 : /* Quick exit if nothing to do */
1500 0 : if (PqSendPointer == PqSendStart)
1501 0 : return 0;
1502 :
1503 : /* No-op if reentrant call */
1504 0 : if (PqCommBusy)
1505 0 : return 0;
1506 :
1507 : /* Temporarily put the socket into non-blocking mode */
1508 0 : socket_set_nonblocking(true);
1509 :
1510 0 : PqCommBusy = true;
1511 0 : res = internal_flush();
1512 0 : PqCommBusy = false;
1513 0 : return res;
1514 : }
1515 :
1516 : /* --------------------------------
1517 : * socket_is_send_pending - is there any pending data in the output buffer?
1518 : * --------------------------------
1519 : */
1520 : static bool
1521 0 : socket_is_send_pending(void)
1522 : {
1523 0 : return (PqSendStart < PqSendPointer);
1524 : }
1525 :
1526 : /* --------------------------------
1527 : * Message-level I/O routines begin here.
1528 : *
1529 : * These routines understand about the old-style COPY OUT protocol.
1530 : * --------------------------------
1531 : */
1532 :
1533 :
1534 : /* --------------------------------
1535 : * socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1536 : *
1537 : * If msgtype is not '\0', it is a message type code to place before
1538 : * the message body. If msgtype is '\0', then the message has no type
1539 : * code (this is only valid in pre-3.0 protocols).
1540 : *
1541 : * len is the length of the message body data at *s. In protocol 3.0
1542 : * and later, a message length word (equal to len+4 because it counts
1543 : * itself too) is inserted by this routine.
1544 : *
1545 : * All normal messages are suppressed while old-style COPY OUT is in
1546 : * progress. (In practice only a few notice messages might get emitted
1547 : * then; dropping them is annoying, but at least they will still appear
1548 : * in the postmaster log.)
1549 : *
1550 : * We also suppress messages generated while pqcomm.c is busy. This
1551 : * avoids any possibility of messages being inserted within other
1552 : * messages. The only known trouble case arises if SIGQUIT occurs
1553 : * during a pqcomm.c routine --- quickdie() will try to send a warning
1554 : * message, and the most reasonable approach seems to be to drop it.
1555 : *
1556 : * returns 0 if OK, EOF if trouble
1557 : * --------------------------------
1558 : */
1559 : static int
1560 109300 : socket_putmessage(char msgtype, const char *s, size_t len)
1561 : {
1562 109300 : if (DoingCopyOut || PqCommBusy)
1563 0 : return 0;
1564 109300 : PqCommBusy = true;
1565 109300 : if (msgtype)
1566 109300 : if (internal_putbytes(&msgtype, 1))
1567 0 : goto fail;
1568 109300 : if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1569 : {
1570 : uint32 n32;
1571 :
1572 109300 : n32 = htonl((uint32) (len + 4));
1573 109300 : if (internal_putbytes((char *) &n32, 4))
1574 0 : goto fail;
1575 : }
1576 109300 : if (internal_putbytes(s, len))
1577 0 : goto fail;
1578 109300 : PqCommBusy = false;
1579 109300 : return 0;
1580 :
1581 : fail:
1582 0 : PqCommBusy = false;
1583 0 : return EOF;
1584 : }
1585 :
1586 : /* --------------------------------
1587 : * pq_putmessage_noblock - like pq_putmessage, but never blocks
1588 : *
1589 : * If the output buffer is too small to hold the message, the buffer
1590 : * is enlarged.
1591 : */
1592 : static void
1593 0 : socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1594 : {
1595 : int res PG_USED_FOR_ASSERTS_ONLY;
1596 : int required;
1597 :
1598 : /*
1599 : * Ensure we have enough space in the output buffer for the message header
1600 : * as well as the message itself.
1601 : */
1602 0 : required = PqSendPointer + 1 + 4 + len;
1603 0 : if (required > PqSendBufferSize)
1604 : {
1605 0 : PqSendBuffer = repalloc(PqSendBuffer, required);
1606 0 : PqSendBufferSize = required;
1607 : }
1608 0 : res = pq_putmessage(msgtype, s, len);
1609 0 : Assert(res == 0); /* should not fail when the message fits in
1610 : * buffer */
1611 0 : }
1612 :
1613 :
1614 : /* --------------------------------
1615 : * socket_startcopyout - inform libpq that an old-style COPY OUT transfer
1616 : * is beginning
1617 : * --------------------------------
1618 : */
1619 : static void
1620 0 : socket_startcopyout(void)
1621 : {
1622 0 : DoingCopyOut = true;
1623 0 : }
1624 :
1625 : /* --------------------------------
1626 : * socket_endcopyout - end an old-style COPY OUT transfer
1627 : *
1628 : * If errorAbort is indicated, we are aborting a COPY OUT due to an error,
1629 : * and must send a terminator line. Since a partial data line might have
1630 : * been emitted, send a couple of newlines first (the first one could
1631 : * get absorbed by a backslash...) Note that old-style COPY OUT does
1632 : * not allow binary transfers, so a textual terminator is always correct.
1633 : * --------------------------------
1634 : */
1635 : static void
1636 3249 : socket_endcopyout(bool errorAbort)
1637 : {
1638 3249 : if (!DoingCopyOut)
1639 6498 : return;
1640 0 : if (errorAbort)
1641 0 : pq_putbytes("\n\n\\.\n", 5);
1642 : /* in non-error case, copy.c will have emitted the terminator line */
1643 0 : DoingCopyOut = false;
1644 : }
1645 :
1646 : /*
1647 : * Support for TCP Keepalive parameters
1648 : */
1649 :
1650 : /*
1651 : * On Windows, we need to set both idle and interval at the same time.
1652 : * We also cannot reset them to the default (setting to zero will
1653 : * actually set them to zero, not default), therefore we fallback to
1654 : * the out-of-the-box default instead.
1655 : */
1656 : #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1657 : static int
1658 : pq_setkeepaliveswin32(Port *port, int idle, int interval)
1659 : {
1660 : struct tcp_keepalive ka;
1661 : DWORD retsize;
1662 :
1663 : if (idle <= 0)
1664 : idle = 2 * 60 * 60; /* default = 2 hours */
1665 : if (interval <= 0)
1666 : interval = 1; /* default = 1 second */
1667 :
1668 : ka.onoff = 1;
1669 : ka.keepalivetime = idle * 1000;
1670 : ka.keepaliveinterval = interval * 1000;
1671 :
1672 : if (WSAIoctl(port->sock,
1673 : SIO_KEEPALIVE_VALS,
1674 : (LPVOID) &ka,
1675 : sizeof(ka),
1676 : NULL,
1677 : 0,
1678 : &retsize,
1679 : NULL,
1680 : NULL)
1681 : != 0)
1682 : {
1683 : elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
1684 : WSAGetLastError());
1685 : return STATUS_ERROR;
1686 : }
1687 : if (port->keepalives_idle != idle)
1688 : port->keepalives_idle = idle;
1689 : if (port->keepalives_interval != interval)
1690 : port->keepalives_interval = interval;
1691 : return STATUS_OK;
1692 : }
1693 : #endif
1694 :
1695 : int
1696 1 : pq_getkeepalivesidle(Port *port)
1697 : {
1698 : #if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1699 1 : if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1700 1 : return 0;
1701 :
1702 0 : if (port->keepalives_idle != 0)
1703 0 : return port->keepalives_idle;
1704 :
1705 0 : if (port->default_keepalives_idle == 0)
1706 : {
1707 : #ifndef WIN32
1708 0 : ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
1709 :
1710 0 : if (getsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1711 0 : (char *) &port->default_keepalives_idle,
1712 : &size) < 0)
1713 : {
1714 0 : elog(LOG, "getsockopt(%s) failed: %m", PG_TCP_KEEPALIVE_IDLE_STR);
1715 0 : port->default_keepalives_idle = -1; /* don't know */
1716 : }
1717 : #else /* WIN32 */
1718 : /* We can't get the defaults on Windows, so return "don't know" */
1719 : port->default_keepalives_idle = -1;
1720 : #endif /* WIN32 */
1721 : }
1722 :
1723 0 : return port->default_keepalives_idle;
1724 : #else
1725 : return 0;
1726 : #endif
1727 : }
1728 :
1729 : int
1730 5 : pq_setkeepalivesidle(int idle, Port *port)
1731 : {
1732 5 : if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1733 5 : return STATUS_OK;
1734 :
1735 : /* check SIO_KEEPALIVE_VALS here, not just WIN32, as some toolchains lack it */
1736 : #if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1737 0 : if (idle == port->keepalives_idle)
1738 0 : return STATUS_OK;
1739 :
1740 : #ifndef WIN32
1741 0 : if (port->default_keepalives_idle <= 0)
1742 : {
1743 0 : if (pq_getkeepalivesidle(port) < 0)
1744 : {
1745 0 : if (idle == 0)
1746 0 : return STATUS_OK; /* default is set but unknown */
1747 : else
1748 0 : return STATUS_ERROR;
1749 : }
1750 : }
1751 :
1752 0 : if (idle == 0)
1753 0 : idle = port->default_keepalives_idle;
1754 :
1755 0 : if (setsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1756 : (char *) &idle, sizeof(idle)) < 0)
1757 : {
1758 0 : elog(LOG, "setsockopt(%s) failed: %m", PG_TCP_KEEPALIVE_IDLE_STR);
1759 0 : return STATUS_ERROR;
1760 : }
1761 :
1762 0 : port->keepalives_idle = idle;
1763 : #else /* WIN32 */
1764 : return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1765 : #endif
1766 : #else
1767 : if (idle != 0)
1768 : {
1769 : elog(LOG, "setting the keepalive idle time is not supported");
1770 : return STATUS_ERROR;
1771 : }
1772 : #endif
1773 :
1774 0 : return STATUS_OK;
1775 : }
1776 :
1777 : int
1778 1 : pq_getkeepalivesinterval(Port *port)
1779 : {
1780 : #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1781 1 : if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1782 1 : return 0;
1783 :
1784 0 : if (port->keepalives_interval != 0)
1785 0 : return port->keepalives_interval;
1786 :
1787 0 : if (port->default_keepalives_interval == 0)
1788 : {
1789 : #ifndef WIN32
1790 0 : ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
1791 :
1792 0 : if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1793 0 : (char *) &port->default_keepalives_interval,
1794 : &size) < 0)
1795 : {
1796 0 : elog(LOG, "getsockopt(%s) failed: %m", "TCP_KEEPINTVL");
1797 0 : port->default_keepalives_interval = -1; /* don't know */
1798 : }
1799 : #else
1800 : /* We can't get the defaults on Windows, so return "don't know" */
1801 : port->default_keepalives_interval = -1;
1802 : #endif /* WIN32 */
1803 : }
1804 :
1805 0 : return port->default_keepalives_interval;
1806 : #else
1807 : return 0;
1808 : #endif
1809 : }
1810 :
1811 : int
1812 5 : pq_setkeepalivesinterval(int interval, Port *port)
1813 : {
1814 5 : if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1815 5 : return STATUS_OK;
1816 :
1817 : #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1818 0 : if (interval == port->keepalives_interval)
1819 0 : return STATUS_OK;
1820 :
1821 : #ifndef WIN32
1822 0 : if (port->default_keepalives_interval <= 0)
1823 : {
1824 0 : if (pq_getkeepalivesinterval(port) < 0)
1825 : {
1826 0 : if (interval == 0)
1827 0 : return STATUS_OK; /* default is set but unknown */
1828 : else
1829 0 : return STATUS_ERROR;
1830 : }
1831 : }
1832 :
1833 0 : if (interval == 0)
1834 0 : interval = port->default_keepalives_interval;
1835 :
1836 0 : if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1837 : (char *) &interval, sizeof(interval)) < 0)
1838 : {
1839 0 : elog(LOG, "setsockopt(%s) failed: %m", "TCP_KEEPINTVL");
1840 0 : return STATUS_ERROR;
1841 : }
1842 :
1843 0 : port->keepalives_interval = interval;
1844 : #else /* WIN32 */
1845 : return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1846 : #endif
1847 : #else
1848 : if (interval != 0)
1849 : {
1850 : elog(LOG, "setsockopt(%s) not supported", "TCP_KEEPINTVL");
1851 : return STATUS_ERROR;
1852 : }
1853 : #endif
1854 :
1855 0 : return STATUS_OK;
1856 : }
1857 :
1858 : int
1859 1 : pq_getkeepalivescount(Port *port)
1860 : {
1861 : #ifdef TCP_KEEPCNT
1862 1 : if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1863 1 : return 0;
1864 :
1865 0 : if (port->keepalives_count != 0)
1866 0 : return port->keepalives_count;
1867 :
1868 0 : if (port->default_keepalives_count == 0)
1869 : {
1870 0 : ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
1871 :
1872 0 : if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1873 0 : (char *) &port->default_keepalives_count,
1874 : &size) < 0)
1875 : {
1876 0 : elog(LOG, "getsockopt(%s) failed: %m", "TCP_KEEPCNT");
1877 0 : port->default_keepalives_count = -1; /* don't know */
1878 : }
1879 : }
1880 :
1881 0 : return port->default_keepalives_count;
1882 : #else
1883 : return 0;
1884 : #endif
1885 : }
1886 :
1887 : int
1888 5 : pq_setkeepalivescount(int count, Port *port)
1889 : {
1890 5 : if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1891 5 : return STATUS_OK;
1892 :
1893 : #ifdef TCP_KEEPCNT
1894 0 : if (count == port->keepalives_count)
1895 0 : return STATUS_OK;
1896 :
1897 0 : if (port->default_keepalives_count <= 0)
1898 : {
1899 0 : if (pq_getkeepalivescount(port) < 0)
1900 : {
1901 0 : if (count == 0)
1902 0 : return STATUS_OK; /* default is set but unknown */
1903 : else
1904 0 : return STATUS_ERROR;
1905 : }
1906 : }
1907 :
1908 0 : if (count == 0)
1909 0 : count = port->default_keepalives_count;
1910 :
1911 0 : if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1912 : (char *) &count, sizeof(count)) < 0)
1913 : {
1914 0 : elog(LOG, "setsockopt(%s) failed: %m", "TCP_KEEPCNT");
1915 0 : return STATUS_ERROR;
1916 : }
1917 :
1918 0 : port->keepalives_count = count;
1919 : #else
1920 : if (count != 0)
1921 : {
1922 : elog(LOG, "setsockopt(%s) not supported", "TCP_KEEPCNT");
1923 : return STATUS_ERROR;
1924 : }
1925 : #endif
1926 :
1927 0 : return STATUS_OK;
1928 : }
|