Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pqmq.c
4 : * Use the frontend/backend protocol for communication over a shm_mq
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * src/backend/libpq/pqmq.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres.h"
15 :
16 : #include "libpq/libpq.h"
17 : #include "libpq/pqformat.h"
18 : #include "libpq/pqmq.h"
19 : #include "miscadmin.h"
20 : #include "pgstat.h"
21 : #include "tcop/tcopprot.h"
22 : #include "utils/builtins.h"
23 :
24 : static shm_mq_handle *pq_mq_handle;
25 : static bool pq_mq_busy = false;
26 : static pid_t pq_mq_parallel_master_pid = 0;
27 : static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
28 :
29 : static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg);
30 : static void mq_comm_reset(void);
31 : static int mq_flush(void);
32 : static int mq_flush_if_writable(void);
33 : static bool mq_is_send_pending(void);
34 : static int mq_putmessage(char msgtype, const char *s, size_t len);
35 : static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
36 : static void mq_startcopyout(void);
37 : static void mq_endcopyout(bool errorAbort);
38 :
39 : static PQcommMethods PqCommMqMethods = {
40 : mq_comm_reset,
41 : mq_flush,
42 : mq_flush_if_writable,
43 : mq_is_send_pending,
44 : mq_putmessage,
45 : mq_putmessage_noblock,
46 : mq_startcopyout,
47 : mq_endcopyout
48 : };
49 :
50 : /*
51 : * Arrange to redirect frontend/backend protocol messages to a shared-memory
52 : * message queue.
53 : */
54 : void
55 115 : pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
56 : {
57 115 : PqCommMethods = &PqCommMqMethods;
58 115 : pq_mq_handle = mqh;
59 115 : whereToSendOutput = DestRemote;
60 115 : FrontendProtocol = PG_PROTOCOL_LATEST;
61 115 : on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
62 115 : }
63 :
64 : /*
65 : * When the DSM that contains our shm_mq goes away, we need to stop sending
66 : * messages to it.
67 : */
68 : static void
69 115 : pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
70 : {
71 115 : pq_mq_handle = NULL;
72 115 : whereToSendOutput = DestNone;
73 115 : }
74 :
75 : /*
76 : * Arrange to SendProcSignal() to the parallel master each time we transmit
77 : * message data via the shm_mq.
78 : */
79 : void
80 115 : pq_set_parallel_master(pid_t pid, BackendId backend_id)
81 : {
82 115 : Assert(PqCommMethods == &PqCommMqMethods);
83 115 : pq_mq_parallel_master_pid = pid;
84 115 : pq_mq_parallel_master_backend_id = backend_id;
85 115 : }
86 :
87 : static void
88 0 : mq_comm_reset(void)
89 : {
90 : /* Nothing to do. */
91 0 : }
92 :
93 : static int
94 1 : mq_flush(void)
95 : {
96 : /* Nothing to do. */
97 1 : return 0;
98 : }
99 :
100 : static int
101 0 : mq_flush_if_writable(void)
102 : {
103 : /* Nothing to do. */
104 0 : return 0;
105 : }
106 :
107 : static bool
108 0 : mq_is_send_pending(void)
109 : {
110 : /* There's never anything pending. */
111 0 : return 0;
112 : }
113 :
114 : /*
115 : * Transmit a libpq protocol message to the shared memory message queue
116 : * selected via pq_mq_handle. We don't include a length word, because the
117 : * receiver will know the length of the message from shm_mq_receive().
118 : */
119 : static int
120 230 : mq_putmessage(char msgtype, const char *s, size_t len)
121 : {
122 : shm_mq_iovec iov[2];
123 : shm_mq_result result;
124 :
125 : /*
126 : * If we're sending a message, and we have to wait because the queue is
127 : * full, and then we get interrupted, and that interrupt results in trying
128 : * to send another message, we respond by detaching the queue. There's no
129 : * way to return to the original context, but even if there were, just
130 : * queueing the message would amount to indefinitely postponing the
131 : * response to the interrupt. So we do this instead.
132 : */
133 230 : if (pq_mq_busy)
134 : {
135 0 : if (pq_mq_handle != NULL)
136 0 : shm_mq_detach(pq_mq_handle);
137 0 : pq_mq_handle = NULL;
138 0 : return EOF;
139 : }
140 :
141 : /*
142 : * If the message queue is already gone, just ignore the message. This
143 : * doesn't necessarily indicate a problem; for example, DEBUG messages can
144 : * be generated late in the shutdown sequence, after all DSMs have already
145 : * been detached.
146 : */
147 230 : if (pq_mq_handle == NULL)
148 0 : return 0;
149 :
150 230 : pq_mq_busy = true;
151 :
152 230 : iov[0].data = &msgtype;
153 230 : iov[0].len = 1;
154 230 : iov[1].data = s;
155 230 : iov[1].len = len;
156 :
157 230 : Assert(pq_mq_handle != NULL);
158 :
159 : for (;;)
160 : {
161 230 : result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
162 :
163 230 : if (pq_mq_parallel_master_pid != 0)
164 230 : SendProcSignal(pq_mq_parallel_master_pid,
165 : PROCSIG_PARALLEL_MESSAGE,
166 : pq_mq_parallel_master_backend_id);
167 :
168 230 : if (result != SHM_MQ_WOULD_BLOCK)
169 230 : break;
170 :
171 0 : WaitLatch(MyLatch, WL_LATCH_SET, 0,
172 : WAIT_EVENT_MQ_PUT_MESSAGE);
173 0 : ResetLatch(MyLatch);
174 0 : CHECK_FOR_INTERRUPTS();
175 0 : }
176 :
177 230 : pq_mq_busy = false;
178 :
179 230 : Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
180 230 : if (result != SHM_MQ_SUCCESS)
181 0 : return EOF;
182 230 : return 0;
183 : }
184 :
185 : static void
186 0 : mq_putmessage_noblock(char msgtype, const char *s, size_t len)
187 : {
188 : /*
189 : * While the shm_mq machinery does support sending a message in
190 : * non-blocking mode, there's currently no way to try sending beginning to
191 : * send the message that doesn't also commit us to completing the
192 : * transmission. This could be improved in the future, but for now we
193 : * don't need it.
194 : */
195 0 : elog(ERROR, "not currently supported");
196 : }
197 :
198 : static void
199 0 : mq_startcopyout(void)
200 : {
201 : /* Nothing to do. */
202 0 : }
203 :
204 : static void
205 0 : mq_endcopyout(bool errorAbort)
206 : {
207 : /* Nothing to do. */
208 0 : }
209 :
210 : /*
211 : * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
212 : * structure with the results.
213 : */
214 : void
215 1 : pq_parse_errornotice(StringInfo msg, ErrorData *edata)
216 : {
217 : /* Initialize edata with reasonable defaults. */
218 1 : MemSet(edata, 0, sizeof(ErrorData));
219 1 : edata->elevel = ERROR;
220 1 : edata->assoc_context = CurrentMemoryContext;
221 :
222 : /* Loop over fields and extract each one. */
223 : for (;;)
224 : {
225 8 : char code = pq_getmsgbyte(msg);
226 : const char *value;
227 :
228 8 : if (code == '\0')
229 : {
230 1 : pq_getmsgend(msg);
231 1 : break;
232 : }
233 7 : value = pq_getmsgrawstring(msg);
234 :
235 7 : switch (code)
236 : {
237 : case PG_DIAG_SEVERITY:
238 : /* ignore, trusting we'll get a nonlocalized version */
239 1 : break;
240 : case PG_DIAG_SEVERITY_NONLOCALIZED:
241 1 : if (strcmp(value, "DEBUG") == 0)
242 : {
243 : /*
244 : * We can't reconstruct the exact DEBUG level, but
245 : * presumably it was >= client_min_messages, so select
246 : * DEBUG1 to ensure we'll pass it on to the client.
247 : */
248 0 : edata->elevel = DEBUG1;
249 : }
250 1 : else if (strcmp(value, "LOG") == 0)
251 : {
252 : /*
253 : * It can't be LOG_SERVER_ONLY, or the worker wouldn't
254 : * have sent it to us; so LOG is the correct value.
255 : */
256 0 : edata->elevel = LOG;
257 : }
258 1 : else if (strcmp(value, "INFO") == 0)
259 0 : edata->elevel = INFO;
260 1 : else if (strcmp(value, "NOTICE") == 0)
261 0 : edata->elevel = NOTICE;
262 1 : else if (strcmp(value, "WARNING") == 0)
263 0 : edata->elevel = WARNING;
264 1 : else if (strcmp(value, "ERROR") == 0)
265 1 : edata->elevel = ERROR;
266 0 : else if (strcmp(value, "FATAL") == 0)
267 0 : edata->elevel = FATAL;
268 0 : else if (strcmp(value, "PANIC") == 0)
269 0 : edata->elevel = PANIC;
270 : else
271 0 : elog(ERROR, "unrecognized error severity: \"%s\"", value);
272 1 : break;
273 : case PG_DIAG_SQLSTATE:
274 1 : if (strlen(value) != 5)
275 0 : elog(ERROR, "invalid SQLSTATE: \"%s\"", value);
276 1 : edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
277 : value[3], value[4]);
278 1 : break;
279 : case PG_DIAG_MESSAGE_PRIMARY:
280 1 : edata->message = pstrdup(value);
281 1 : break;
282 : case PG_DIAG_MESSAGE_DETAIL:
283 0 : edata->detail = pstrdup(value);
284 0 : break;
285 : case PG_DIAG_MESSAGE_HINT:
286 0 : edata->hint = pstrdup(value);
287 0 : break;
288 : case PG_DIAG_STATEMENT_POSITION:
289 0 : edata->cursorpos = pg_atoi(value, sizeof(int), '\0');
290 0 : break;
291 : case PG_DIAG_INTERNAL_POSITION:
292 0 : edata->internalpos = pg_atoi(value, sizeof(int), '\0');
293 0 : break;
294 : case PG_DIAG_INTERNAL_QUERY:
295 0 : edata->internalquery = pstrdup(value);
296 0 : break;
297 : case PG_DIAG_CONTEXT:
298 0 : edata->context = pstrdup(value);
299 0 : break;
300 : case PG_DIAG_SCHEMA_NAME:
301 0 : edata->schema_name = pstrdup(value);
302 0 : break;
303 : case PG_DIAG_TABLE_NAME:
304 0 : edata->table_name = pstrdup(value);
305 0 : break;
306 : case PG_DIAG_COLUMN_NAME:
307 0 : edata->column_name = pstrdup(value);
308 0 : break;
309 : case PG_DIAG_DATATYPE_NAME:
310 0 : edata->datatype_name = pstrdup(value);
311 0 : break;
312 : case PG_DIAG_CONSTRAINT_NAME:
313 0 : edata->constraint_name = pstrdup(value);
314 0 : break;
315 : case PG_DIAG_SOURCE_FILE:
316 1 : edata->filename = pstrdup(value);
317 1 : break;
318 : case PG_DIAG_SOURCE_LINE:
319 1 : edata->lineno = pg_atoi(value, sizeof(int), '\0');
320 1 : break;
321 : case PG_DIAG_SOURCE_FUNCTION:
322 1 : edata->funcname = pstrdup(value);
323 1 : break;
324 : default:
325 0 : elog(ERROR, "unrecognized error field code: %d", (int) code);
326 : break;
327 : }
328 7 : }
329 1 : }
|