Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tqueue.c
4 : * Use shm_mq to send & receive tuples between parallel backends
5 : *
6 : * Most of the complexity in this module arises from transient RECORD types,
7 : * which all have type RECORDOID and are distinguished by typmod numbers
8 : * that are managed per-backend (see src/backend/utils/cache/typcache.c).
9 : * The sender's set of RECORD typmod assignments probably doesn't match the
10 : * receiver's. To deal with this, we make the sender send a description
11 : * of each transient RECORD type appearing in the data it sends. The
12 : * receiver finds or creates a matching type in its own typcache, and then
13 : * maps the sender's typmod for that type to its own typmod.
14 : *
15 : * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
16 : * under the hood, writes tuples from the executor to a shm_mq. If
17 : * necessary, it also writes control messages describing transient
18 : * record types used within the tuple.
19 : *
20 : * A TupleQueueReader reads tuples, and control messages if any are sent,
21 : * from a shm_mq and returns the tuples. If transient record types are
22 : * in use, it registers those types locally based on the control messages
23 : * and rewrites the typmods sent by the remote side to the corresponding
24 : * local record typmods.
25 : *
26 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
27 : * Portions Copyright (c) 1994, Regents of the University of California
28 : *
29 : * IDENTIFICATION
30 : * src/backend/executor/tqueue.c
31 : *
32 : *-------------------------------------------------------------------------
33 : */
34 :
35 : #include "postgres.h"
36 :
37 : #include "access/htup_details.h"
38 : #include "catalog/pg_type.h"
39 : #include "executor/tqueue.h"
40 : #include "funcapi.h"
41 : #include "lib/stringinfo.h"
42 : #include "miscadmin.h"
43 : #include "utils/array.h"
44 : #include "utils/lsyscache.h"
45 : #include "utils/memutils.h"
46 : #include "utils/rangetypes.h"
47 : #include "utils/syscache.h"
48 : #include "utils/typcache.h"
49 :
50 :
51 : /*
52 : * The data transferred through the shm_mq is divided into messages.
53 : * One-byte messages are mode-switch messages, telling the receiver to switch
54 : * between "control" and "data" modes. (We always start up in "data" mode.)
55 : * Otherwise, when in "data" mode, each message is a tuple. When in "control"
56 : * mode, each message defines one transient-typmod-to-tupledesc mapping to
57 : * let us interpret future tuples. Both of those cases certainly require
58 : * more than one byte, so no confusion is possible.
59 : */
60 : #define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */
61 : #define TUPLE_QUEUE_MODE_DATA 'd'
62 :
63 : /*
64 : * Both the sender and receiver build trees of TupleRemapInfo nodes to help
65 : * them identify which (sub) fields of transmitted tuples are composite and
66 : * may thus need remap processing. We might need to look within arrays and
67 : * ranges, not only composites, to find composite sub-fields. A NULL
68 : * TupleRemapInfo pointer indicates that it is known that the described field
69 : * is not composite and has no composite substructure.
70 : *
71 : * Note that we currently have to look at each composite field at runtime,
72 : * even if we believe it's of a named composite type (i.e., not RECORD).
73 : * This is because we allow the actual value to be a compatible transient
74 : * RECORD type. That's grossly inefficient, and it would be good to get
75 : * rid of the requirement, but it's not clear what would need to change.
76 : *
77 : * Also, we allow the top-level tuple structure, as well as the actual
78 : * structure of composite subfields, to change from one tuple to the next
79 : * at runtime. This may well be entirely historical, but it's mostly free
80 : * to support given the previous requirement; and other places in the system
81 : * also permit this, so it's not entirely clear if we could drop it.
82 : */
83 :
84 : typedef enum
85 : {
86 : TQUEUE_REMAP_ARRAY, /* array */
87 : TQUEUE_REMAP_RANGE, /* range */
88 : TQUEUE_REMAP_RECORD /* composite type, named or transient */
89 : } TupleRemapClass;
90 :
91 : typedef struct TupleRemapInfo TupleRemapInfo;
92 :
93 : typedef struct ArrayRemapInfo
94 : {
95 : int16 typlen; /* array element type's storage properties */
96 : bool typbyval;
97 : char typalign;
98 : TupleRemapInfo *element_remap; /* array element type's remap info */
99 : } ArrayRemapInfo;
100 :
101 : typedef struct RangeRemapInfo
102 : {
103 : TypeCacheEntry *typcache; /* range type's typcache entry */
104 : TupleRemapInfo *bound_remap; /* range bound type's remap info */
105 : } RangeRemapInfo;
106 :
107 : typedef struct RecordRemapInfo
108 : {
109 : /* Original (remote) type ID info last seen for this composite field */
110 : Oid rectypid;
111 : int32 rectypmod;
112 : /* Local RECORD typmod, or -1 if unset; not used on sender side */
113 : int32 localtypmod;
114 : /* If no fields of the record require remapping, these are NULL: */
115 : TupleDesc tupledesc; /* copy of record's tupdesc */
116 : TupleRemapInfo **field_remap; /* each field's remap info */
117 : } RecordRemapInfo;
118 :
119 : struct TupleRemapInfo
120 : {
121 : TupleRemapClass remapclass;
122 : union
123 : {
124 : ArrayRemapInfo arr;
125 : RangeRemapInfo rng;
126 : RecordRemapInfo rec;
127 : } u;
128 : };
129 :
130 : /*
131 : * DestReceiver object's private contents
132 : *
133 : * queue and tupledesc are pointers to data supplied by DestReceiver's caller.
134 : * The recordhtab and remap info are owned by the DestReceiver and are kept
135 : * in mycontext. tmpcontext is a tuple-lifespan context to hold cruft
136 : * created while traversing each tuple to find record subfields.
137 : */
138 : typedef struct TQueueDestReceiver
139 : {
140 : DestReceiver pub; /* public fields */
141 : shm_mq_handle *queue; /* shm_mq to send to */
142 : MemoryContext mycontext; /* context containing TQueueDestReceiver */
143 : MemoryContext tmpcontext; /* per-tuple context, if needed */
144 : HTAB *recordhtab; /* table of transmitted typmods, if needed */
145 : char mode; /* current message mode */
146 : TupleDesc tupledesc; /* current top-level tuple descriptor */
147 : TupleRemapInfo **field_remapinfo; /* current top-level remap info */
148 : } TQueueDestReceiver;
149 :
150 : /*
151 : * Hash table entries for mapping remote to local typmods.
152 : */
153 : typedef struct RecordTypmodMap
154 : {
155 : int32 remotetypmod; /* hash key (must be first!) */
156 : int32 localtypmod;
157 : } RecordTypmodMap;
158 :
159 : /*
160 : * TupleQueueReader object's private contents
161 : *
162 : * queue and tupledesc are pointers to data supplied by reader's caller.
163 : * The typmodmap and remap info are owned by the TupleQueueReader and
164 : * are kept in mycontext.
165 : *
166 : * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
167 : */
168 : struct TupleQueueReader
169 : {
170 : shm_mq_handle *queue; /* shm_mq to receive from */
171 : MemoryContext mycontext; /* context containing TupleQueueReader */
172 : HTAB *typmodmap; /* RecordTypmodMap hashtable, if needed */
173 : char mode; /* current message mode */
174 : TupleDesc tupledesc; /* current top-level tuple descriptor */
175 : TupleRemapInfo **field_remapinfo; /* current top-level remap info */
176 : };
177 :
178 : /* Local function prototypes */
179 : static void TQExamine(TQueueDestReceiver *tqueue,
180 : TupleRemapInfo *remapinfo,
181 : Datum value);
182 : static void TQExamineArray(TQueueDestReceiver *tqueue,
183 : ArrayRemapInfo *remapinfo,
184 : Datum value);
185 : static void TQExamineRange(TQueueDestReceiver *tqueue,
186 : RangeRemapInfo *remapinfo,
187 : Datum value);
188 : static void TQExamineRecord(TQueueDestReceiver *tqueue,
189 : RecordRemapInfo *remapinfo,
190 : Datum value);
191 : static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
192 : TupleDesc tupledesc);
193 : static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
194 : Size nbytes, char *data);
195 : static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
196 : Size nbytes, HeapTupleHeader data);
197 : static HeapTuple TQRemapTuple(TupleQueueReader *reader,
198 : TupleDesc tupledesc,
199 : TupleRemapInfo **field_remapinfo,
200 : HeapTuple tuple);
201 : static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
202 : Datum value, bool *changed);
203 : static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
204 : Datum value, bool *changed);
205 : static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
206 : Datum value, bool *changed);
207 : static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
208 : Datum value, bool *changed);
209 : static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
210 : static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
211 : MemoryContext mycontext);
212 : static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
213 : MemoryContext mycontext);
214 : static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
215 : MemoryContext mycontext);
216 :
217 :
218 : /*
219 : * Receive a tuple from a query, and send it to the designated shm_mq.
220 : *
221 : * Returns TRUE if successful, FALSE if shm_mq has been detached.
222 : */
223 : static bool
224 50 : tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
225 : {
226 50 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
227 50 : TupleDesc tupledesc = slot->tts_tupleDescriptor;
228 : HeapTuple tuple;
229 : shm_mq_result result;
230 :
231 : /*
232 : * If first time through, compute remapping info for the top-level fields.
233 : * On later calls, if the tupledesc has changed, set up for the new
234 : * tupledesc. (This is a strange test both because the executor really
235 : * shouldn't change the tupledesc, and also because it would be unsafe if
236 : * the old tupledesc could be freed and a new one allocated at the same
237 : * address. But since some very old code in printtup.c uses a similar
238 : * approach, we adopt it here as well.)
239 : *
240 : * Here and elsewhere in this module, when replacing remapping info we
241 : * pfree the top-level object because that's easy, but we don't bother to
242 : * recursively free any substructure. This would lead to query-lifespan
243 : * memory leaks if the mapping info actually changed frequently, but since
244 : * we don't expect that to happen, it doesn't seem worth expending code to
245 : * prevent it.
246 : */
247 50 : if (tqueue->tupledesc != tupledesc)
248 : {
249 : /* Is it worth trying to free substructure of the remap tree? */
250 46 : if (tqueue->field_remapinfo != NULL)
251 0 : pfree(tqueue->field_remapinfo);
252 46 : tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
253 : tqueue->mycontext);
254 46 : tqueue->tupledesc = tupledesc;
255 : }
256 :
257 : /*
258 : * When, because of the types being transmitted, no record typmod mapping
259 : * can be needed, we can skip a good deal of work.
260 : */
261 50 : if (tqueue->field_remapinfo != NULL)
262 : {
263 5 : TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
264 : int i;
265 5 : MemoryContext oldcontext = NULL;
266 :
267 : /* Deform the tuple so we can examine fields, if not done already. */
268 5 : slot_getallattrs(slot);
269 :
270 : /* Iterate over each attribute and search it for transient typmods. */
271 15 : for (i = 0; i < tupledesc->natts; i++)
272 : {
273 : /* Ignore nulls and types that don't need special handling. */
274 10 : if (slot->tts_isnull[i] || remapinfo[i] == NULL)
275 5 : continue;
276 :
277 : /* Switch to temporary memory context to avoid leaking. */
278 5 : if (oldcontext == NULL)
279 : {
280 5 : if (tqueue->tmpcontext == NULL)
281 1 : tqueue->tmpcontext =
282 1 : AllocSetContextCreate(tqueue->mycontext,
283 : "tqueue sender temp context",
284 : ALLOCSET_DEFAULT_SIZES);
285 5 : oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
286 : }
287 :
288 : /* Examine the value. */
289 5 : TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
290 : }
291 :
292 : /* If we used the temp context, reset it and restore prior context. */
293 5 : if (oldcontext != NULL)
294 : {
295 5 : MemoryContextSwitchTo(oldcontext);
296 5 : MemoryContextReset(tqueue->tmpcontext);
297 : }
298 :
299 : /* If we entered control mode, switch back to data mode. */
300 5 : if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
301 : {
302 5 : tqueue->mode = TUPLE_QUEUE_MODE_DATA;
303 5 : shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
304 : }
305 : }
306 :
307 : /* Send the tuple itself. */
308 50 : tuple = ExecMaterializeSlot(slot);
309 50 : result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
310 :
311 : /* Check for failure. */
312 50 : if (result == SHM_MQ_DETACHED)
313 0 : return false;
314 50 : else if (result != SHM_MQ_SUCCESS)
315 0 : ereport(ERROR,
316 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
317 : errmsg("could not send tuple to shared-memory queue")));
318 :
319 50 : return true;
320 : }
321 :
322 : /*
323 : * Examine the given datum and send any necessary control messages for
324 : * transient record types contained in it.
325 : *
326 : * remapinfo is previously-computed remapping info about the datum's type.
327 : *
328 : * This function just dispatches based on the remap class.
329 : */
330 : static void
331 5 : TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
332 : {
333 : /* This is recursive, so it could be driven to stack overflow. */
334 5 : check_stack_depth();
335 :
336 5 : switch (remapinfo->remapclass)
337 : {
338 : case TQUEUE_REMAP_ARRAY:
339 0 : TQExamineArray(tqueue, &remapinfo->u.arr, value);
340 0 : break;
341 : case TQUEUE_REMAP_RANGE:
342 0 : TQExamineRange(tqueue, &remapinfo->u.rng, value);
343 0 : break;
344 : case TQUEUE_REMAP_RECORD:
345 5 : TQExamineRecord(tqueue, &remapinfo->u.rec, value);
346 5 : break;
347 : }
348 5 : }
349 :
350 : /*
351 : * Examine a record datum and send any necessary control messages for
352 : * transient record types contained in it.
353 : */
354 : static void
355 5 : TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
356 : Datum value)
357 : {
358 : HeapTupleHeader tup;
359 : Oid typid;
360 : int32 typmod;
361 : TupleDesc tupledesc;
362 :
363 : /* Extract type OID and typmod from tuple. */
364 5 : tup = DatumGetHeapTupleHeader(value);
365 5 : typid = HeapTupleHeaderGetTypeId(tup);
366 5 : typmod = HeapTupleHeaderGetTypMod(tup);
367 :
368 : /*
369 : * If first time through, or if this isn't the same composite type as last
370 : * time, consider sending a control message, and then look up the
371 : * necessary information for examining the fields.
372 : */
373 5 : if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
374 : {
375 : /* Free any old data. */
376 5 : if (remapinfo->tupledesc != NULL)
377 0 : FreeTupleDesc(remapinfo->tupledesc);
378 : /* Is it worth trying to free substructure of the remap tree? */
379 5 : if (remapinfo->field_remap != NULL)
380 0 : pfree(remapinfo->field_remap);
381 :
382 : /* Look up tuple descriptor in typcache. */
383 5 : tupledesc = lookup_rowtype_tupdesc(typid, typmod);
384 :
385 : /*
386 : * If this is a transient record type, send the tupledesc in a control
387 : * message. (TQSendRecordInfo is smart enough to do this only once
388 : * per typmod.)
389 : */
390 5 : if (typid == RECORDOID)
391 5 : TQSendRecordInfo(tqueue, typmod, tupledesc);
392 :
393 : /* Figure out whether fields need recursive processing. */
394 5 : remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
395 : tqueue->mycontext);
396 5 : if (remapinfo->field_remap != NULL)
397 : {
398 : /*
399 : * We need to inspect the record contents, so save a copy of the
400 : * tupdesc. (We could possibly just reference the typcache's
401 : * copy, but then it's problematic when to release the refcount.)
402 : */
403 0 : MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
404 :
405 0 : remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
406 0 : MemoryContextSwitchTo(oldcontext);
407 : }
408 : else
409 : {
410 : /* No fields of the record require remapping. */
411 5 : remapinfo->tupledesc = NULL;
412 : }
413 5 : remapinfo->rectypid = typid;
414 5 : remapinfo->rectypmod = typmod;
415 :
416 : /* Release reference count acquired by lookup_rowtype_tupdesc. */
417 5 : DecrTupleDescRefCount(tupledesc);
418 : }
419 :
420 : /*
421 : * If field remapping is required, deform the tuple and examine each
422 : * field.
423 : */
424 5 : if (remapinfo->field_remap != NULL)
425 : {
426 : Datum *values;
427 : bool *isnull;
428 : HeapTupleData tdata;
429 : int i;
430 :
431 : /* Deform the tuple so we can check each column within. */
432 0 : tupledesc = remapinfo->tupledesc;
433 0 : values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
434 0 : isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
435 0 : tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
436 0 : ItemPointerSetInvalid(&(tdata.t_self));
437 0 : tdata.t_tableOid = InvalidOid;
438 0 : tdata.t_data = tup;
439 0 : heap_deform_tuple(&tdata, tupledesc, values, isnull);
440 :
441 : /* Recursively check each interesting non-NULL attribute. */
442 0 : for (i = 0; i < tupledesc->natts; i++)
443 : {
444 0 : if (!isnull[i] && remapinfo->field_remap[i])
445 0 : TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
446 : }
447 :
448 : /* Need not clean up, since we're in a short-lived context. */
449 : }
450 5 : }
451 :
452 : /*
453 : * Examine an array datum and send any necessary control messages for
454 : * transient record types contained in it.
455 : */
456 : static void
457 0 : TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
458 : Datum value)
459 : {
460 0 : ArrayType *arr = DatumGetArrayTypeP(value);
461 0 : Oid typid = ARR_ELEMTYPE(arr);
462 : Datum *elem_values;
463 : bool *elem_nulls;
464 : int num_elems;
465 : int i;
466 :
467 : /* Deconstruct the array. */
468 0 : deconstruct_array(arr, typid, remapinfo->typlen,
469 0 : remapinfo->typbyval, remapinfo->typalign,
470 : &elem_values, &elem_nulls, &num_elems);
471 :
472 : /* Examine each element. */
473 0 : for (i = 0; i < num_elems; i++)
474 : {
475 0 : if (!elem_nulls[i])
476 0 : TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
477 : }
478 0 : }
479 :
480 : /*
481 : * Examine a range datum and send any necessary control messages for
482 : * transient record types contained in it.
483 : */
484 : static void
485 0 : TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
486 : Datum value)
487 : {
488 0 : RangeType *range = DatumGetRangeType(value);
489 : RangeBound lower;
490 : RangeBound upper;
491 : bool empty;
492 :
493 : /* Extract the lower and upper bounds. */
494 0 : range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
495 :
496 : /* Nothing to do for an empty range. */
497 0 : if (empty)
498 0 : return;
499 :
500 : /* Examine each bound, if present. */
501 0 : if (!upper.infinite)
502 0 : TQExamine(tqueue, remapinfo->bound_remap, upper.val);
503 0 : if (!lower.infinite)
504 0 : TQExamine(tqueue, remapinfo->bound_remap, lower.val);
505 : }
506 :
507 : /*
508 : * Send tuple descriptor information for a transient typmod, unless we've
509 : * already done so previously.
510 : */
511 : static void
512 5 : TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
513 : {
514 : StringInfoData buf;
515 : bool found;
516 : int i;
517 :
518 : /* Initialize hash table if not done yet. */
519 5 : if (tqueue->recordhtab == NULL)
520 : {
521 : HASHCTL ctl;
522 :
523 1 : MemSet(&ctl, 0, sizeof(ctl));
524 : /* Hash table entries are just typmods */
525 1 : ctl.keysize = sizeof(int32);
526 1 : ctl.entrysize = sizeof(int32);
527 1 : ctl.hcxt = tqueue->mycontext;
528 1 : tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
529 : 100, &ctl,
530 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
531 : }
532 :
533 : /* Have we already seen this record type? If not, must report it. */
534 5 : hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
535 5 : if (found)
536 5 : return;
537 :
538 5 : elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
539 :
540 : /* If message queue is in data mode, switch to control mode. */
541 5 : if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
542 : {
543 5 : tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
544 5 : shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
545 : }
546 :
547 : /* Assemble a control message. */
548 5 : initStringInfo(&buf);
549 5 : appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
550 5 : appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
551 5 : appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
552 20 : for (i = 0; i < tupledesc->natts; i++)
553 : {
554 15 : appendBinaryStringInfo(&buf, (char *) TupleDescAttr(tupledesc, i),
555 : sizeof(FormData_pg_attribute));
556 : }
557 :
558 : /* Send control message. */
559 5 : shm_mq_send(tqueue->queue, buf.len, buf.data, false);
560 :
561 : /* We assume it's OK to leak buf because we're in a short-lived context. */
562 : }
563 :
564 : /*
565 : * Prepare to receive tuples from executor.
566 : */
567 : static void
568 115 : tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
569 : {
570 : /* do nothing */
571 115 : }
572 :
573 : /*
574 : * Clean up at end of an executor run
575 : */
576 : static void
577 114 : tqueueShutdownReceiver(DestReceiver *self)
578 : {
579 114 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
580 :
581 114 : if (tqueue->queue != NULL)
582 114 : shm_mq_detach(tqueue->queue);
583 114 : tqueue->queue = NULL;
584 114 : }
585 :
586 : /*
587 : * Destroy receiver when done with it
588 : */
589 : static void
590 114 : tqueueDestroyReceiver(DestReceiver *self)
591 : {
592 114 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
593 :
594 : /* We probably already detached from queue, but let's be sure */
595 114 : if (tqueue->queue != NULL)
596 0 : shm_mq_detach(tqueue->queue);
597 114 : if (tqueue->tmpcontext != NULL)
598 1 : MemoryContextDelete(tqueue->tmpcontext);
599 114 : if (tqueue->recordhtab != NULL)
600 1 : hash_destroy(tqueue->recordhtab);
601 : /* Is it worth trying to free substructure of the remap tree? */
602 114 : if (tqueue->field_remapinfo != NULL)
603 1 : pfree(tqueue->field_remapinfo);
604 114 : pfree(self);
605 114 : }
606 :
607 : /*
608 : * Create a DestReceiver that writes tuples to a tuple queue.
609 : */
610 : DestReceiver *
611 115 : CreateTupleQueueDestReceiver(shm_mq_handle *handle)
612 : {
613 : TQueueDestReceiver *self;
614 :
615 115 : self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
616 :
617 115 : self->pub.receiveSlot = tqueueReceiveSlot;
618 115 : self->pub.rStartup = tqueueStartupReceiver;
619 115 : self->pub.rShutdown = tqueueShutdownReceiver;
620 115 : self->pub.rDestroy = tqueueDestroyReceiver;
621 115 : self->pub.mydest = DestTupleQueue;
622 115 : self->queue = handle;
623 115 : self->mycontext = CurrentMemoryContext;
624 115 : self->tmpcontext = NULL;
625 115 : self->recordhtab = NULL;
626 115 : self->mode = TUPLE_QUEUE_MODE_DATA;
627 : /* Top-level tupledesc is not known yet */
628 115 : self->tupledesc = NULL;
629 115 : self->field_remapinfo = NULL;
630 :
631 115 : return (DestReceiver *) self;
632 : }
633 :
634 : /*
635 : * Create a tuple queue reader.
636 : */
637 : TupleQueueReader *
638 115 : CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
639 : {
640 115 : TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
641 :
642 115 : reader->queue = handle;
643 115 : reader->mycontext = CurrentMemoryContext;
644 115 : reader->typmodmap = NULL;
645 115 : reader->mode = TUPLE_QUEUE_MODE_DATA;
646 115 : reader->tupledesc = tupledesc;
647 115 : reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
648 :
649 115 : return reader;
650 : }
651 :
652 : /*
653 : * Destroy a tuple queue reader.
654 : *
655 : * Note: cleaning up the underlying shm_mq is the caller's responsibility.
656 : * We won't access it here, as it may be detached already.
657 : */
658 : void
659 114 : DestroyTupleQueueReader(TupleQueueReader *reader)
660 : {
661 114 : if (reader->typmodmap != NULL)
662 1 : hash_destroy(reader->typmodmap);
663 : /* Is it worth trying to free substructure of the remap tree? */
664 114 : if (reader->field_remapinfo != NULL)
665 1 : pfree(reader->field_remapinfo);
666 114 : pfree(reader);
667 114 : }
668 :
669 : /*
670 : * Fetch a tuple from a tuple queue reader.
671 : *
672 : * The return value is NULL if there are no remaining tuples or if
673 : * nowait = true and no tuple is ready to return. *done, if not NULL,
674 : * is set to true when there are no remaining tuples and otherwise to false.
675 : *
676 : * The returned tuple, if any, is allocated in CurrentMemoryContext.
677 : * That should be a short-lived (tuple-lifespan) context, because we are
678 : * pretty cavalier about leaking memory in that context if we have to do
679 : * tuple remapping.
680 : *
681 : * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
682 : * accumulate bytes from a partially-read message, so it's useful to call
683 : * this with nowait = true even if nothing is returned.
684 : */
685 : HeapTuple
686 433994 : TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
687 : {
688 : shm_mq_result result;
689 :
690 433994 : if (done != NULL)
691 433994 : *done = false;
692 :
693 : for (;;)
694 : {
695 : Size nbytes;
696 : void *data;
697 :
698 : /* Attempt to read a message. */
699 434009 : result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
700 :
701 : /* If queue is detached, set *done and return NULL. */
702 434009 : if (result == SHM_MQ_DETACHED)
703 : {
704 114 : if (done != NULL)
705 114 : *done = true;
706 434108 : return NULL;
707 : }
708 :
709 : /* In non-blocking mode, bail out if no message ready yet. */
710 433895 : if (result == SHM_MQ_WOULD_BLOCK)
711 433830 : return NULL;
712 65 : Assert(result == SHM_MQ_SUCCESS);
713 :
714 : /*
715 : * We got a message (see message spec at top of file). Process it.
716 : */
717 65 : if (nbytes == 1)
718 : {
719 : /* Mode switch message. */
720 10 : reader->mode = ((char *) data)[0];
721 : }
722 55 : else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
723 : {
724 : /* Tuple data. */
725 50 : return TupleQueueHandleDataMessage(reader, nbytes, data);
726 : }
727 5 : else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
728 : {
729 : /* Control message, describing a transient record type. */
730 5 : TupleQueueHandleControlMessage(reader, nbytes, data);
731 : }
732 : else
733 0 : elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
734 15 : }
735 : }
736 :
737 : /*
738 : * Handle a data message - that is, a tuple - from the remote side.
739 : */
740 : static HeapTuple
741 50 : TupleQueueHandleDataMessage(TupleQueueReader *reader,
742 : Size nbytes,
743 : HeapTupleHeader data)
744 : {
745 : HeapTupleData htup;
746 :
747 : /*
748 : * Set up a dummy HeapTupleData pointing to the data from the shm_mq
749 : * (which had better be sufficiently aligned).
750 : */
751 50 : ItemPointerSetInvalid(&htup.t_self);
752 50 : htup.t_tableOid = InvalidOid;
753 50 : htup.t_len = nbytes;
754 50 : htup.t_data = data;
755 :
756 : /*
757 : * Either just copy the data into a regular palloc'd tuple, or remap it,
758 : * as required.
759 : */
760 50 : return TQRemapTuple(reader,
761 : reader->tupledesc,
762 : reader->field_remapinfo,
763 : &htup);
764 : }
765 :
766 : /*
767 : * Copy the given tuple, remapping any transient typmods contained in it.
768 : */
769 : static HeapTuple
770 50 : TQRemapTuple(TupleQueueReader *reader,
771 : TupleDesc tupledesc,
772 : TupleRemapInfo **field_remapinfo,
773 : HeapTuple tuple)
774 : {
775 : Datum *values;
776 : bool *isnull;
777 50 : bool changed = false;
778 : int i;
779 :
780 : /*
781 : * If no remapping is necessary, just copy the tuple into a single
782 : * palloc'd chunk, as caller will expect.
783 : */
784 50 : if (field_remapinfo == NULL)
785 45 : return heap_copytuple(tuple);
786 :
787 : /* Deform tuple so we can remap record typmods for individual attrs. */
788 5 : values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
789 5 : isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
790 5 : heap_deform_tuple(tuple, tupledesc, values, isnull);
791 :
792 : /* Recursively process each interesting non-NULL attribute. */
793 15 : for (i = 0; i < tupledesc->natts; i++)
794 : {
795 10 : if (isnull[i] || field_remapinfo[i] == NULL)
796 5 : continue;
797 5 : values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
798 : }
799 :
800 : /* Reconstruct the modified tuple, if anything was modified. */
801 5 : if (changed)
802 0 : return heap_form_tuple(tupledesc, values, isnull);
803 : else
804 5 : return heap_copytuple(tuple);
805 : }
806 :
807 : /*
808 : * Process the given datum and replace any transient record typmods
809 : * contained in it. Set *changed to TRUE if we actually changed the datum.
810 : *
811 : * remapinfo is previously-computed remapping info about the datum's type.
812 : *
813 : * This function just dispatches based on the remap class.
814 : */
815 : static Datum
816 5 : TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
817 : Datum value, bool *changed)
818 : {
819 : /* This is recursive, so it could be driven to stack overflow. */
820 5 : check_stack_depth();
821 :
822 5 : switch (remapinfo->remapclass)
823 : {
824 : case TQUEUE_REMAP_ARRAY:
825 0 : return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
826 :
827 : case TQUEUE_REMAP_RANGE:
828 0 : return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
829 :
830 : case TQUEUE_REMAP_RECORD:
831 5 : return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
832 : }
833 :
834 0 : elog(ERROR, "unrecognized tqueue remap class: %d",
835 : (int) remapinfo->remapclass);
836 : return (Datum) 0;
837 : }
838 :
839 : /*
840 : * Process the given array datum and replace any transient record typmods
841 : * contained in it. Set *changed to TRUE if we actually changed the datum.
842 : */
843 : static Datum
844 0 : TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
845 : Datum value, bool *changed)
846 : {
847 0 : ArrayType *arr = DatumGetArrayTypeP(value);
848 0 : Oid typid = ARR_ELEMTYPE(arr);
849 0 : bool element_changed = false;
850 : Datum *elem_values;
851 : bool *elem_nulls;
852 : int num_elems;
853 : int i;
854 :
855 : /* Deconstruct the array. */
856 0 : deconstruct_array(arr, typid, remapinfo->typlen,
857 0 : remapinfo->typbyval, remapinfo->typalign,
858 : &elem_values, &elem_nulls, &num_elems);
859 :
860 : /* Remap each element. */
861 0 : for (i = 0; i < num_elems; i++)
862 : {
863 0 : if (!elem_nulls[i])
864 0 : elem_values[i] = TQRemap(reader,
865 : remapinfo->element_remap,
866 0 : elem_values[i],
867 : &element_changed);
868 : }
869 :
870 0 : if (element_changed)
871 : {
872 : /* Reconstruct and return the array. */
873 0 : *changed = true;
874 0 : arr = construct_md_array(elem_values, elem_nulls,
875 0 : ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
876 0 : typid, remapinfo->typlen,
877 0 : remapinfo->typbyval, remapinfo->typalign);
878 0 : return PointerGetDatum(arr);
879 : }
880 :
881 : /* Else just return the value as-is. */
882 0 : return value;
883 : }
884 :
885 : /*
886 : * Process the given range datum and replace any transient record typmods
887 : * contained in it. Set *changed to TRUE if we actually changed the datum.
888 : */
889 : static Datum
890 0 : TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
891 : Datum value, bool *changed)
892 : {
893 0 : RangeType *range = DatumGetRangeType(value);
894 0 : bool bound_changed = false;
895 : RangeBound lower;
896 : RangeBound upper;
897 : bool empty;
898 :
899 : /* Extract the lower and upper bounds. */
900 0 : range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
901 :
902 : /* Nothing to do for an empty range. */
903 0 : if (empty)
904 0 : return value;
905 :
906 : /* Remap each bound, if present. */
907 0 : if (!upper.infinite)
908 0 : upper.val = TQRemap(reader, remapinfo->bound_remap,
909 : upper.val, &bound_changed);
910 0 : if (!lower.infinite)
911 0 : lower.val = TQRemap(reader, remapinfo->bound_remap,
912 : lower.val, &bound_changed);
913 :
914 0 : if (bound_changed)
915 : {
916 : /* Reserialize. */
917 0 : *changed = true;
918 0 : range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
919 0 : return RangeTypeGetDatum(range);
920 : }
921 :
922 : /* Else just return the value as-is. */
923 0 : return value;
924 : }
925 :
926 : /*
927 : * Process the given record datum and replace any transient record typmods
928 : * contained in it. Set *changed to TRUE if we actually changed the datum.
929 : */
930 : static Datum
931 5 : TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
932 : Datum value, bool *changed)
933 : {
934 : HeapTupleHeader tup;
935 : Oid typid;
936 : int32 typmod;
937 : bool changed_typmod;
938 : TupleDesc tupledesc;
939 :
940 : /* Extract type OID and typmod from tuple. */
941 5 : tup = DatumGetHeapTupleHeader(value);
942 5 : typid = HeapTupleHeaderGetTypeId(tup);
943 5 : typmod = HeapTupleHeaderGetTypMod(tup);
944 :
945 : /*
946 : * If first time through, or if this isn't the same composite type as last
947 : * time, identify the required typmod mapping, and then look up the
948 : * necessary information for processing the fields.
949 : */
950 5 : if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
951 : {
952 : /* Free any old data. */
953 5 : if (remapinfo->tupledesc != NULL)
954 0 : FreeTupleDesc(remapinfo->tupledesc);
955 : /* Is it worth trying to free substructure of the remap tree? */
956 5 : if (remapinfo->field_remap != NULL)
957 0 : pfree(remapinfo->field_remap);
958 :
959 : /* If transient record type, look up matching local typmod. */
960 5 : if (typid == RECORDOID)
961 : {
962 : RecordTypmodMap *mapent;
963 :
964 5 : Assert(reader->typmodmap != NULL);
965 5 : mapent = hash_search(reader->typmodmap, &typmod,
966 : HASH_FIND, NULL);
967 5 : if (mapent == NULL)
968 0 : elog(ERROR, "tqueue received unrecognized remote typmod %d",
969 : typmod);
970 5 : remapinfo->localtypmod = mapent->localtypmod;
971 : }
972 : else
973 0 : remapinfo->localtypmod = -1;
974 :
975 : /* Look up tuple descriptor in typcache. */
976 5 : tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
977 :
978 : /* Figure out whether fields need recursive processing. */
979 5 : remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
980 : reader->mycontext);
981 5 : if (remapinfo->field_remap != NULL)
982 : {
983 : /*
984 : * We need to inspect the record contents, so save a copy of the
985 : * tupdesc. (We could possibly just reference the typcache's
986 : * copy, but then it's problematic when to release the refcount.)
987 : */
988 0 : MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
989 :
990 0 : remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
991 0 : MemoryContextSwitchTo(oldcontext);
992 : }
993 : else
994 : {
995 : /* No fields of the record require remapping. */
996 5 : remapinfo->tupledesc = NULL;
997 : }
998 5 : remapinfo->rectypid = typid;
999 5 : remapinfo->rectypmod = typmod;
1000 :
1001 : /* Release reference count acquired by lookup_rowtype_tupdesc. */
1002 5 : DecrTupleDescRefCount(tupledesc);
1003 : }
1004 :
1005 : /* If transient record, replace remote typmod with local typmod. */
1006 5 : if (typid == RECORDOID && typmod != remapinfo->localtypmod)
1007 : {
1008 0 : typmod = remapinfo->localtypmod;
1009 0 : changed_typmod = true;
1010 : }
1011 : else
1012 5 : changed_typmod = false;
1013 :
1014 : /*
1015 : * If we need to change the typmod, or if there are any potentially
1016 : * remappable fields, replace the tuple.
1017 : */
1018 5 : if (changed_typmod || remapinfo->field_remap != NULL)
1019 : {
1020 : HeapTupleData htup;
1021 : HeapTuple atup;
1022 :
1023 : /* For now, assume we always need to change the tuple in this case. */
1024 0 : *changed = true;
1025 :
1026 : /* Copy tuple, possibly remapping contained fields. */
1027 0 : ItemPointerSetInvalid(&htup.t_self);
1028 0 : htup.t_tableOid = InvalidOid;
1029 0 : htup.t_len = HeapTupleHeaderGetDatumLength(tup);
1030 0 : htup.t_data = tup;
1031 0 : atup = TQRemapTuple(reader,
1032 : remapinfo->tupledesc,
1033 : remapinfo->field_remap,
1034 : &htup);
1035 :
1036 : /* Apply the correct labeling for a local Datum. */
1037 0 : HeapTupleHeaderSetTypeId(atup->t_data, typid);
1038 0 : HeapTupleHeaderSetTypMod(atup->t_data, typmod);
1039 0 : HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
1040 :
1041 : /* And return the results. */
1042 0 : return HeapTupleHeaderGetDatum(atup->t_data);
1043 : }
1044 :
1045 : /* Else just return the value as-is. */
1046 5 : return value;
1047 : }
1048 :
1049 : /*
1050 : * Handle a control message from the tuple queue reader.
1051 : *
1052 : * Control messages are sent when the remote side is sending tuples that
1053 : * contain transient record types. We need to arrange to bless those
1054 : * record types locally and translate between remote and local typmods.
1055 : */
1056 : static void
1057 5 : TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
1058 : char *data)
1059 : {
1060 : int32 remotetypmod;
1061 : int natts;
1062 : bool hasoid;
1063 5 : Size offset = 0;
1064 : Form_pg_attribute *attrs;
1065 : TupleDesc tupledesc;
1066 : RecordTypmodMap *mapent;
1067 : bool found;
1068 : int i;
1069 :
1070 : /* Extract remote typmod. */
1071 5 : memcpy(&remotetypmod, &data[offset], sizeof(int32));
1072 5 : offset += sizeof(int32);
1073 :
1074 : /* Extract attribute count. */
1075 5 : memcpy(&natts, &data[offset], sizeof(int));
1076 5 : offset += sizeof(int);
1077 :
1078 : /* Extract hasoid flag. */
1079 5 : memcpy(&hasoid, &data[offset], sizeof(bool));
1080 5 : offset += sizeof(bool);
1081 :
1082 : /* Extract attribute details. The tupledesc made here is just transient. */
1083 5 : attrs = palloc(natts * sizeof(Form_pg_attribute));
1084 20 : for (i = 0; i < natts; i++)
1085 : {
1086 15 : attrs[i] = palloc(sizeof(FormData_pg_attribute));
1087 15 : memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
1088 15 : offset += sizeof(FormData_pg_attribute);
1089 : }
1090 :
1091 : /* We should have read the whole message. */
1092 5 : Assert(offset == nbytes);
1093 :
1094 : /* Construct TupleDesc, and assign a local typmod. */
1095 5 : tupledesc = CreateTupleDesc(natts, hasoid, attrs);
1096 5 : tupledesc = BlessTupleDesc(tupledesc);
1097 :
1098 : /* Create mapping hashtable if it doesn't exist already. */
1099 5 : if (reader->typmodmap == NULL)
1100 : {
1101 : HASHCTL ctl;
1102 :
1103 1 : MemSet(&ctl, 0, sizeof(ctl));
1104 1 : ctl.keysize = sizeof(int32);
1105 1 : ctl.entrysize = sizeof(RecordTypmodMap);
1106 1 : ctl.hcxt = reader->mycontext;
1107 1 : reader->typmodmap = hash_create("tqueue receiver record type hashtable",
1108 : 100, &ctl,
1109 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1110 : }
1111 :
1112 : /* Create map entry. */
1113 5 : mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
1114 : &found);
1115 5 : if (found)
1116 0 : elog(ERROR, "duplicate tqueue control message for typmod %d",
1117 : remotetypmod);
1118 5 : mapent->localtypmod = tupledesc->tdtypmod;
1119 :
1120 5 : elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
1121 : remotetypmod, mapent->localtypmod);
1122 5 : }
1123 :
1124 : /*
1125 : * Build remap info for the specified data type, storing it in mycontext.
1126 : * Returns NULL if neither the type nor any subtype could require remapping.
1127 : */
1128 : static TupleRemapInfo *
1129 229 : BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
1130 : {
1131 : HeapTuple tup;
1132 : Form_pg_type typ;
1133 :
1134 : /* This is recursive, so it could be driven to stack overflow. */
1135 229 : check_stack_depth();
1136 :
1137 : restart:
1138 229 : tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
1139 229 : if (!HeapTupleIsValid(tup))
1140 0 : elog(ERROR, "cache lookup failed for type %u", typid);
1141 229 : typ = (Form_pg_type) GETSTRUCT(tup);
1142 :
1143 : /* Look through domains to underlying base type. */
1144 229 : if (typ->typtype == TYPTYPE_DOMAIN)
1145 : {
1146 0 : typid = typ->typbasetype;
1147 0 : ReleaseSysCache(tup);
1148 0 : goto restart;
1149 : }
1150 :
1151 : /* If it's a true array type, deal with it that way. */
1152 229 : if (OidIsValid(typ->typelem) && typ->typlen == -1)
1153 : {
1154 0 : typid = typ->typelem;
1155 0 : ReleaseSysCache(tup);
1156 0 : return BuildArrayRemapInfo(typid, mycontext);
1157 : }
1158 :
1159 : /* Similarly, deal with ranges appropriately. */
1160 229 : if (typ->typtype == TYPTYPE_RANGE)
1161 : {
1162 0 : ReleaseSysCache(tup);
1163 0 : return BuildRangeRemapInfo(typid, mycontext);
1164 : }
1165 :
1166 : /*
1167 : * If it's a composite type (including RECORD), set up for remapping. We
1168 : * don't attempt to determine the status of subfields here, since we do
1169 : * not have enough information yet; just mark everything invalid.
1170 : */
1171 229 : if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
1172 : {
1173 : TupleRemapInfo *remapinfo;
1174 :
1175 2 : remapinfo = (TupleRemapInfo *)
1176 : MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
1177 2 : remapinfo->remapclass = TQUEUE_REMAP_RECORD;
1178 2 : remapinfo->u.rec.rectypid = InvalidOid;
1179 2 : remapinfo->u.rec.rectypmod = -1;
1180 2 : remapinfo->u.rec.localtypmod = -1;
1181 2 : remapinfo->u.rec.tupledesc = NULL;
1182 2 : remapinfo->u.rec.field_remap = NULL;
1183 2 : ReleaseSysCache(tup);
1184 2 : return remapinfo;
1185 : }
1186 :
1187 : /* Nothing else can possibly need remapping attention. */
1188 227 : ReleaseSysCache(tup);
1189 227 : return NULL;
1190 : }
1191 :
1192 : static TupleRemapInfo *
1193 0 : BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
1194 : {
1195 : TupleRemapInfo *remapinfo;
1196 : TupleRemapInfo *element_remapinfo;
1197 :
1198 : /* See if element type requires remapping. */
1199 0 : element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
1200 : /* If not, the array doesn't either. */
1201 0 : if (element_remapinfo == NULL)
1202 0 : return NULL;
1203 : /* OK, set up to remap the array. */
1204 0 : remapinfo = (TupleRemapInfo *)
1205 : MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
1206 0 : remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
1207 0 : get_typlenbyvalalign(elemtypid,
1208 : &remapinfo->u.arr.typlen,
1209 : &remapinfo->u.arr.typbyval,
1210 : &remapinfo->u.arr.typalign);
1211 0 : remapinfo->u.arr.element_remap = element_remapinfo;
1212 0 : return remapinfo;
1213 : }
1214 :
1215 : static TupleRemapInfo *
1216 0 : BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
1217 : {
1218 : TupleRemapInfo *remapinfo;
1219 : TupleRemapInfo *bound_remapinfo;
1220 : TypeCacheEntry *typcache;
1221 :
1222 : /*
1223 : * Get range info from the typcache. We assume this pointer will stay
1224 : * valid for the duration of the query.
1225 : */
1226 0 : typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
1227 0 : if (typcache->rngelemtype == NULL)
1228 0 : elog(ERROR, "type %u is not a range type", rngtypid);
1229 :
1230 : /* See if range bound type requires remapping. */
1231 0 : bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
1232 : mycontext);
1233 : /* If not, the range doesn't either. */
1234 0 : if (bound_remapinfo == NULL)
1235 0 : return NULL;
1236 : /* OK, set up to remap the range. */
1237 0 : remapinfo = (TupleRemapInfo *)
1238 : MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
1239 0 : remapinfo->remapclass = TQUEUE_REMAP_RANGE;
1240 0 : remapinfo->u.rng.typcache = typcache;
1241 0 : remapinfo->u.rng.bound_remap = bound_remapinfo;
1242 0 : return remapinfo;
1243 : }
1244 :
1245 : /*
1246 : * Build remap info for fields of the type described by the given tupdesc.
1247 : * Returns an array of TupleRemapInfo pointers, or NULL if no field
1248 : * requires remapping. Data is allocated in mycontext.
1249 : */
1250 : static TupleRemapInfo **
1251 171 : BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
1252 : {
1253 : TupleRemapInfo **remapinfo;
1254 171 : bool noop = true;
1255 : int i;
1256 :
1257 : /* Recursively determine the remapping status of each field. */
1258 171 : remapinfo = (TupleRemapInfo **)
1259 171 : MemoryContextAlloc(mycontext,
1260 171 : tupledesc->natts * sizeof(TupleRemapInfo *));
1261 400 : for (i = 0; i < tupledesc->natts; i++)
1262 : {
1263 229 : Form_pg_attribute attr = TupleDescAttr(tupledesc, i);
1264 :
1265 229 : if (attr->attisdropped)
1266 : {
1267 0 : remapinfo[i] = NULL;
1268 0 : continue;
1269 : }
1270 229 : remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
1271 229 : if (remapinfo[i] != NULL)
1272 2 : noop = false;
1273 : }
1274 :
1275 : /* If no fields require remapping, report that by returning NULL. */
1276 171 : if (noop)
1277 : {
1278 169 : pfree(remapinfo);
1279 169 : remapinfo = NULL;
1280 : }
1281 :
1282 171 : return remapinfo;
1283 : }
|