LCOV - code coverage report
Current view: top level - src/backend/executor - tqueue.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 252 384 65.6 %
Date: 2017-09-29 15:12:54 Functions: 18 24 75.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * tqueue.c
       4             :  *    Use shm_mq to send & receive tuples between parallel backends
       5             :  *
       6             :  * Most of the complexity in this module arises from transient RECORD types,
       7             :  * which all have type RECORDOID and are distinguished by typmod numbers
       8             :  * that are managed per-backend (see src/backend/utils/cache/typcache.c).
       9             :  * The sender's set of RECORD typmod assignments probably doesn't match the
      10             :  * receiver's.  To deal with this, we make the sender send a description
      11             :  * of each transient RECORD type appearing in the data it sends.  The
      12             :  * receiver finds or creates a matching type in its own typcache, and then
      13             :  * maps the sender's typmod for that type to its own typmod.
      14             :  *
      15             :  * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
      16             :  * under the hood, writes tuples from the executor to a shm_mq.  If
      17             :  * necessary, it also writes control messages describing transient
      18             :  * record types used within the tuple.
      19             :  *
      20             :  * A TupleQueueReader reads tuples, and control messages if any are sent,
      21             :  * from a shm_mq and returns the tuples.  If transient record types are
      22             :  * in use, it registers those types locally based on the control messages
      23             :  * and rewrites the typmods sent by the remote side to the corresponding
      24             :  * local record typmods.
      25             :  *
      26             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
      27             :  * Portions Copyright (c) 1994, Regents of the University of California
      28             :  *
      29             :  * IDENTIFICATION
      30             :  *    src/backend/executor/tqueue.c
      31             :  *
      32             :  *-------------------------------------------------------------------------
      33             :  */
      34             : 
      35             : #include "postgres.h"
      36             : 
      37             : #include "access/htup_details.h"
      38             : #include "catalog/pg_type.h"
      39             : #include "executor/tqueue.h"
      40             : #include "funcapi.h"
      41             : #include "lib/stringinfo.h"
      42             : #include "miscadmin.h"
      43             : #include "utils/array.h"
      44             : #include "utils/lsyscache.h"
      45             : #include "utils/memutils.h"
      46             : #include "utils/rangetypes.h"
      47             : #include "utils/syscache.h"
      48             : #include "utils/typcache.h"
      49             : 
      50             : 
      51             : /*
      52             :  * The data transferred through the shm_mq is divided into messages.
      53             :  * One-byte messages are mode-switch messages, telling the receiver to switch
      54             :  * between "control" and "data" modes.  (We always start up in "data" mode.)
      55             :  * Otherwise, when in "data" mode, each message is a tuple.  When in "control"
      56             :  * mode, each message defines one transient-typmod-to-tupledesc mapping to
      57             :  * let us interpret future tuples.  Both of those cases certainly require
      58             :  * more than one byte, so no confusion is possible.
      59             :  */
      60             : #define TUPLE_QUEUE_MODE_CONTROL    'c' /* mode-switch message contents */
      61             : #define TUPLE_QUEUE_MODE_DATA       'd'
      62             : 
      63             : /*
      64             :  * Both the sender and receiver build trees of TupleRemapInfo nodes to help
      65             :  * them identify which (sub) fields of transmitted tuples are composite and
      66             :  * may thus need remap processing.  We might need to look within arrays and
      67             :  * ranges, not only composites, to find composite sub-fields.  A NULL
      68             :  * TupleRemapInfo pointer indicates that it is known that the described field
      69             :  * is not composite and has no composite substructure.
      70             :  *
      71             :  * Note that we currently have to look at each composite field at runtime,
      72             :  * even if we believe it's of a named composite type (i.e., not RECORD).
      73             :  * This is because we allow the actual value to be a compatible transient
      74             :  * RECORD type.  That's grossly inefficient, and it would be good to get
      75             :  * rid of the requirement, but it's not clear what would need to change.
      76             :  *
      77             :  * Also, we allow the top-level tuple structure, as well as the actual
      78             :  * structure of composite subfields, to change from one tuple to the next
      79             :  * at runtime.  This may well be entirely historical, but it's mostly free
      80             :  * to support given the previous requirement; and other places in the system
      81             :  * also permit this, so it's not entirely clear if we could drop it.
      82             :  */
      83             : 
      84             : typedef enum
      85             : {
      86             :     TQUEUE_REMAP_ARRAY,         /* array */
      87             :     TQUEUE_REMAP_RANGE,         /* range */
      88             :     TQUEUE_REMAP_RECORD         /* composite type, named or transient */
      89             : } TupleRemapClass;
      90             : 
      91             : typedef struct TupleRemapInfo TupleRemapInfo;
      92             : 
      93             : typedef struct ArrayRemapInfo
      94             : {
      95             :     int16       typlen;         /* array element type's storage properties */
      96             :     bool        typbyval;
      97             :     char        typalign;
      98             :     TupleRemapInfo *element_remap;  /* array element type's remap info */
      99             : } ArrayRemapInfo;
     100             : 
     101             : typedef struct RangeRemapInfo
     102             : {
     103             :     TypeCacheEntry *typcache;   /* range type's typcache entry */
     104             :     TupleRemapInfo *bound_remap;    /* range bound type's remap info */
     105             : } RangeRemapInfo;
     106             : 
     107             : typedef struct RecordRemapInfo
     108             : {
     109             :     /* Original (remote) type ID info last seen for this composite field */
     110             :     Oid         rectypid;
     111             :     int32       rectypmod;
     112             :     /* Local RECORD typmod, or -1 if unset; not used on sender side */
     113             :     int32       localtypmod;
     114             :     /* If no fields of the record require remapping, these are NULL: */
     115             :     TupleDesc   tupledesc;      /* copy of record's tupdesc */
     116             :     TupleRemapInfo **field_remap;   /* each field's remap info */
     117             : } RecordRemapInfo;
     118             : 
     119             : struct TupleRemapInfo
     120             : {
     121             :     TupleRemapClass remapclass;
     122             :     union
     123             :     {
     124             :         ArrayRemapInfo arr;
     125             :         RangeRemapInfo rng;
     126             :         RecordRemapInfo rec;
     127             :     }           u;
     128             : };
     129             : 
     130             : /*
     131             :  * DestReceiver object's private contents
     132             :  *
     133             :  * queue and tupledesc are pointers to data supplied by DestReceiver's caller.
     134             :  * The recordhtab and remap info are owned by the DestReceiver and are kept
     135             :  * in mycontext.  tmpcontext is a tuple-lifespan context to hold cruft
     136             :  * created while traversing each tuple to find record subfields.
     137             :  */
     138             : typedef struct TQueueDestReceiver
     139             : {
     140             :     DestReceiver pub;           /* public fields */
     141             :     shm_mq_handle *queue;       /* shm_mq to send to */
     142             :     MemoryContext mycontext;    /* context containing TQueueDestReceiver */
     143             :     MemoryContext tmpcontext;   /* per-tuple context, if needed */
     144             :     HTAB       *recordhtab;     /* table of transmitted typmods, if needed */
     145             :     char        mode;           /* current message mode */
     146             :     TupleDesc   tupledesc;      /* current top-level tuple descriptor */
     147             :     TupleRemapInfo **field_remapinfo;   /* current top-level remap info */
     148             : } TQueueDestReceiver;
     149             : 
     150             : /*
     151             :  * Hash table entries for mapping remote to local typmods.
     152             :  */
     153             : typedef struct RecordTypmodMap
     154             : {
     155             :     int32       remotetypmod;   /* hash key (must be first!) */
     156             :     int32       localtypmod;
     157             : } RecordTypmodMap;
     158             : 
     159             : /*
     160             :  * TupleQueueReader object's private contents
     161             :  *
     162             :  * queue and tupledesc are pointers to data supplied by reader's caller.
     163             :  * The typmodmap and remap info are owned by the TupleQueueReader and
     164             :  * are kept in mycontext.
     165             :  *
     166             :  * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
     167             :  */
     168             : struct TupleQueueReader
     169             : {
     170             :     shm_mq_handle *queue;       /* shm_mq to receive from */
     171             :     MemoryContext mycontext;    /* context containing TupleQueueReader */
     172             :     HTAB       *typmodmap;      /* RecordTypmodMap hashtable, if needed */
     173             :     char        mode;           /* current message mode */
     174             :     TupleDesc   tupledesc;      /* current top-level tuple descriptor */
     175             :     TupleRemapInfo **field_remapinfo;   /* current top-level remap info */
     176             : };
     177             : 
     178             : /* Local function prototypes */
     179             : static void TQExamine(TQueueDestReceiver *tqueue,
     180             :           TupleRemapInfo *remapinfo,
     181             :           Datum value);
     182             : static void TQExamineArray(TQueueDestReceiver *tqueue,
     183             :                ArrayRemapInfo *remapinfo,
     184             :                Datum value);
     185             : static void TQExamineRange(TQueueDestReceiver *tqueue,
     186             :                RangeRemapInfo *remapinfo,
     187             :                Datum value);
     188             : static void TQExamineRecord(TQueueDestReceiver *tqueue,
     189             :                 RecordRemapInfo *remapinfo,
     190             :                 Datum value);
     191             : static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
     192             :                  TupleDesc tupledesc);
     193             : static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
     194             :                                Size nbytes, char *data);
     195             : static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
     196             :                             Size nbytes, HeapTupleHeader data);
     197             : static HeapTuple TQRemapTuple(TupleQueueReader *reader,
     198             :              TupleDesc tupledesc,
     199             :              TupleRemapInfo **field_remapinfo,
     200             :              HeapTuple tuple);
     201             : static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
     202             :         Datum value, bool *changed);
     203             : static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
     204             :              Datum value, bool *changed);
     205             : static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
     206             :              Datum value, bool *changed);
     207             : static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
     208             :               Datum value, bool *changed);
     209             : static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
     210             : static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
     211             :                     MemoryContext mycontext);
     212             : static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
     213             :                     MemoryContext mycontext);
     214             : static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
     215             :                     MemoryContext mycontext);
     216             : 
     217             : 
     218             : /*
     219             :  * Receive a tuple from a query, and send it to the designated shm_mq.
     220             :  *
     221             :  * Returns TRUE if successful, FALSE if shm_mq has been detached.
     222             :  */
     223             : static bool
     224          50 : tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
     225             : {
     226          50 :     TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
     227          50 :     TupleDesc   tupledesc = slot->tts_tupleDescriptor;
     228             :     HeapTuple   tuple;
     229             :     shm_mq_result result;
     230             : 
     231             :     /*
     232             :      * If first time through, compute remapping info for the top-level fields.
     233             :      * On later calls, if the tupledesc has changed, set up for the new
     234             :      * tupledesc.  (This is a strange test both because the executor really
     235             :      * shouldn't change the tupledesc, and also because it would be unsafe if
     236             :      * the old tupledesc could be freed and a new one allocated at the same
     237             :      * address.  But since some very old code in printtup.c uses a similar
     238             :      * approach, we adopt it here as well.)
     239             :      *
     240             :      * Here and elsewhere in this module, when replacing remapping info we
     241             :      * pfree the top-level object because that's easy, but we don't bother to
     242             :      * recursively free any substructure.  This would lead to query-lifespan
     243             :      * memory leaks if the mapping info actually changed frequently, but since
     244             :      * we don't expect that to happen, it doesn't seem worth expending code to
     245             :      * prevent it.
     246             :      */
     247          50 :     if (tqueue->tupledesc != tupledesc)
     248             :     {
     249             :         /* Is it worth trying to free substructure of the remap tree? */
     250          46 :         if (tqueue->field_remapinfo != NULL)
     251           0 :             pfree(tqueue->field_remapinfo);
     252          46 :         tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
     253             :                                                       tqueue->mycontext);
     254          46 :         tqueue->tupledesc = tupledesc;
     255             :     }
     256             : 
     257             :     /*
     258             :      * When, because of the types being transmitted, no record typmod mapping
     259             :      * can be needed, we can skip a good deal of work.
     260             :      */
     261          50 :     if (tqueue->field_remapinfo != NULL)
     262             :     {
     263           5 :         TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
     264             :         int         i;
     265           5 :         MemoryContext oldcontext = NULL;
     266             : 
     267             :         /* Deform the tuple so we can examine fields, if not done already. */
     268           5 :         slot_getallattrs(slot);
     269             : 
     270             :         /* Iterate over each attribute and search it for transient typmods. */
     271          15 :         for (i = 0; i < tupledesc->natts; i++)
     272             :         {
     273             :             /* Ignore nulls and types that don't need special handling. */
     274          10 :             if (slot->tts_isnull[i] || remapinfo[i] == NULL)
     275           5 :                 continue;
     276             : 
     277             :             /* Switch to temporary memory context to avoid leaking. */
     278           5 :             if (oldcontext == NULL)
     279             :             {
     280           5 :                 if (tqueue->tmpcontext == NULL)
     281           1 :                     tqueue->tmpcontext =
     282           1 :                         AllocSetContextCreate(tqueue->mycontext,
     283             :                                               "tqueue sender temp context",
     284             :                                               ALLOCSET_DEFAULT_SIZES);
     285           5 :                 oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
     286             :             }
     287             : 
     288             :             /* Examine the value. */
     289           5 :             TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
     290             :         }
     291             : 
     292             :         /* If we used the temp context, reset it and restore prior context. */
     293           5 :         if (oldcontext != NULL)
     294             :         {
     295           5 :             MemoryContextSwitchTo(oldcontext);
     296           5 :             MemoryContextReset(tqueue->tmpcontext);
     297             :         }
     298             : 
     299             :         /* If we entered control mode, switch back to data mode. */
     300           5 :         if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
     301             :         {
     302           5 :             tqueue->mode = TUPLE_QUEUE_MODE_DATA;
     303           5 :             shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
     304             :         }
     305             :     }
     306             : 
     307             :     /* Send the tuple itself. */
     308          50 :     tuple = ExecMaterializeSlot(slot);
     309          50 :     result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
     310             : 
     311             :     /* Check for failure. */
     312          50 :     if (result == SHM_MQ_DETACHED)
     313           0 :         return false;
     314          50 :     else if (result != SHM_MQ_SUCCESS)
     315           0 :         ereport(ERROR,
     316             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     317             :                  errmsg("could not send tuple to shared-memory queue")));
     318             : 
     319          50 :     return true;
     320             : }
     321             : 
     322             : /*
     323             :  * Examine the given datum and send any necessary control messages for
     324             :  * transient record types contained in it.
     325             :  *
     326             :  * remapinfo is previously-computed remapping info about the datum's type.
     327             :  *
     328             :  * This function just dispatches based on the remap class.
     329             :  */
     330             : static void
     331           5 : TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
     332             : {
     333             :     /* This is recursive, so it could be driven to stack overflow. */
     334           5 :     check_stack_depth();
     335             : 
     336           5 :     switch (remapinfo->remapclass)
     337             :     {
     338             :         case TQUEUE_REMAP_ARRAY:
     339           0 :             TQExamineArray(tqueue, &remapinfo->u.arr, value);
     340           0 :             break;
     341             :         case TQUEUE_REMAP_RANGE:
     342           0 :             TQExamineRange(tqueue, &remapinfo->u.rng, value);
     343           0 :             break;
     344             :         case TQUEUE_REMAP_RECORD:
     345           5 :             TQExamineRecord(tqueue, &remapinfo->u.rec, value);
     346           5 :             break;
     347             :     }
     348           5 : }
     349             : 
     350             : /*
     351             :  * Examine a record datum and send any necessary control messages for
     352             :  * transient record types contained in it.
     353             :  */
     354             : static void
     355           5 : TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
     356             :                 Datum value)
     357             : {
     358             :     HeapTupleHeader tup;
     359             :     Oid         typid;
     360             :     int32       typmod;
     361             :     TupleDesc   tupledesc;
     362             : 
     363             :     /* Extract type OID and typmod from tuple. */
     364           5 :     tup = DatumGetHeapTupleHeader(value);
     365           5 :     typid = HeapTupleHeaderGetTypeId(tup);
     366           5 :     typmod = HeapTupleHeaderGetTypMod(tup);
     367             : 
     368             :     /*
     369             :      * If first time through, or if this isn't the same composite type as last
     370             :      * time, consider sending a control message, and then look up the
     371             :      * necessary information for examining the fields.
     372             :      */
     373           5 :     if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
     374             :     {
     375             :         /* Free any old data. */
     376           5 :         if (remapinfo->tupledesc != NULL)
     377           0 :             FreeTupleDesc(remapinfo->tupledesc);
     378             :         /* Is it worth trying to free substructure of the remap tree? */
     379           5 :         if (remapinfo->field_remap != NULL)
     380           0 :             pfree(remapinfo->field_remap);
     381             : 
     382             :         /* Look up tuple descriptor in typcache. */
     383           5 :         tupledesc = lookup_rowtype_tupdesc(typid, typmod);
     384             : 
     385             :         /*
     386             :          * If this is a transient record type, send the tupledesc in a control
     387             :          * message.  (TQSendRecordInfo is smart enough to do this only once
     388             :          * per typmod.)
     389             :          */
     390           5 :         if (typid == RECORDOID)
     391           5 :             TQSendRecordInfo(tqueue, typmod, tupledesc);
     392             : 
     393             :         /* Figure out whether fields need recursive processing. */
     394           5 :         remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
     395             :                                                      tqueue->mycontext);
     396           5 :         if (remapinfo->field_remap != NULL)
     397             :         {
     398             :             /*
     399             :              * We need to inspect the record contents, so save a copy of the
     400             :              * tupdesc.  (We could possibly just reference the typcache's
     401             :              * copy, but then it's problematic when to release the refcount.)
     402             :              */
     403           0 :             MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
     404             : 
     405           0 :             remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
     406           0 :             MemoryContextSwitchTo(oldcontext);
     407             :         }
     408             :         else
     409             :         {
     410             :             /* No fields of the record require remapping. */
     411           5 :             remapinfo->tupledesc = NULL;
     412             :         }
     413           5 :         remapinfo->rectypid = typid;
     414           5 :         remapinfo->rectypmod = typmod;
     415             : 
     416             :         /* Release reference count acquired by lookup_rowtype_tupdesc. */
     417           5 :         DecrTupleDescRefCount(tupledesc);
     418             :     }
     419             : 
     420             :     /*
     421             :      * If field remapping is required, deform the tuple and examine each
     422             :      * field.
     423             :      */
     424           5 :     if (remapinfo->field_remap != NULL)
     425             :     {
     426             :         Datum      *values;
     427             :         bool       *isnull;
     428             :         HeapTupleData tdata;
     429             :         int         i;
     430             : 
     431             :         /* Deform the tuple so we can check each column within. */
     432           0 :         tupledesc = remapinfo->tupledesc;
     433           0 :         values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
     434           0 :         isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
     435           0 :         tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
     436           0 :         ItemPointerSetInvalid(&(tdata.t_self));
     437           0 :         tdata.t_tableOid = InvalidOid;
     438           0 :         tdata.t_data = tup;
     439           0 :         heap_deform_tuple(&tdata, tupledesc, values, isnull);
     440             : 
     441             :         /* Recursively check each interesting non-NULL attribute. */
     442           0 :         for (i = 0; i < tupledesc->natts; i++)
     443             :         {
     444           0 :             if (!isnull[i] && remapinfo->field_remap[i])
     445           0 :                 TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
     446             :         }
     447             : 
     448             :         /* Need not clean up, since we're in a short-lived context. */
     449             :     }
     450           5 : }
     451             : 
     452             : /*
     453             :  * Examine an array datum and send any necessary control messages for
     454             :  * transient record types contained in it.
     455             :  */
     456             : static void
     457           0 : TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
     458             :                Datum value)
     459             : {
     460           0 :     ArrayType  *arr = DatumGetArrayTypeP(value);
     461           0 :     Oid         typid = ARR_ELEMTYPE(arr);
     462             :     Datum      *elem_values;
     463             :     bool       *elem_nulls;
     464             :     int         num_elems;
     465             :     int         i;
     466             : 
     467             :     /* Deconstruct the array. */
     468           0 :     deconstruct_array(arr, typid, remapinfo->typlen,
     469           0 :                       remapinfo->typbyval, remapinfo->typalign,
     470             :                       &elem_values, &elem_nulls, &num_elems);
     471             : 
     472             :     /* Examine each element. */
     473           0 :     for (i = 0; i < num_elems; i++)
     474             :     {
     475           0 :         if (!elem_nulls[i])
     476           0 :             TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
     477             :     }
     478           0 : }
     479             : 
     480             : /*
     481             :  * Examine a range datum and send any necessary control messages for
     482             :  * transient record types contained in it.
     483             :  */
     484             : static void
     485           0 : TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
     486             :                Datum value)
     487             : {
     488           0 :     RangeType  *range = DatumGetRangeType(value);
     489             :     RangeBound  lower;
     490             :     RangeBound  upper;
     491             :     bool        empty;
     492             : 
     493             :     /* Extract the lower and upper bounds. */
     494           0 :     range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
     495             : 
     496             :     /* Nothing to do for an empty range. */
     497           0 :     if (empty)
     498           0 :         return;
     499             : 
     500             :     /* Examine each bound, if present. */
     501           0 :     if (!upper.infinite)
     502           0 :         TQExamine(tqueue, remapinfo->bound_remap, upper.val);
     503           0 :     if (!lower.infinite)
     504           0 :         TQExamine(tqueue, remapinfo->bound_remap, lower.val);
     505             : }
     506             : 
     507             : /*
     508             :  * Send tuple descriptor information for a transient typmod, unless we've
     509             :  * already done so previously.
     510             :  */
     511             : static void
     512           5 : TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
     513             : {
     514             :     StringInfoData buf;
     515             :     bool        found;
     516             :     int         i;
     517             : 
     518             :     /* Initialize hash table if not done yet. */
     519           5 :     if (tqueue->recordhtab == NULL)
     520             :     {
     521             :         HASHCTL     ctl;
     522             : 
     523           1 :         MemSet(&ctl, 0, sizeof(ctl));
     524             :         /* Hash table entries are just typmods */
     525           1 :         ctl.keysize = sizeof(int32);
     526           1 :         ctl.entrysize = sizeof(int32);
     527           1 :         ctl.hcxt = tqueue->mycontext;
     528           1 :         tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
     529             :                                          100, &ctl,
     530             :                                          HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     531             :     }
     532             : 
     533             :     /* Have we already seen this record type?  If not, must report it. */
     534           5 :     hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
     535           5 :     if (found)
     536           5 :         return;
     537             : 
     538           5 :     elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
     539             : 
     540             :     /* If message queue is in data mode, switch to control mode. */
     541           5 :     if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
     542             :     {
     543           5 :         tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
     544           5 :         shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
     545             :     }
     546             : 
     547             :     /* Assemble a control message. */
     548           5 :     initStringInfo(&buf);
     549           5 :     appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
     550           5 :     appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
     551           5 :     appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
     552          20 :     for (i = 0; i < tupledesc->natts; i++)
     553             :     {
     554          15 :         appendBinaryStringInfo(&buf, (char *) TupleDescAttr(tupledesc, i),
     555             :                                sizeof(FormData_pg_attribute));
     556             :     }
     557             : 
     558             :     /* Send control message. */
     559           5 :     shm_mq_send(tqueue->queue, buf.len, buf.data, false);
     560             : 
     561             :     /* We assume it's OK to leak buf because we're in a short-lived context. */
     562             : }
     563             : 
     564             : /*
     565             :  * Prepare to receive tuples from executor.
     566             :  */
     567             : static void
     568         115 : tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
     569             : {
     570             :     /* do nothing */
     571         115 : }
     572             : 
     573             : /*
     574             :  * Clean up at end of an executor run
     575             :  */
     576             : static void
     577         114 : tqueueShutdownReceiver(DestReceiver *self)
     578             : {
     579         114 :     TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
     580             : 
     581         114 :     if (tqueue->queue != NULL)
     582         114 :         shm_mq_detach(tqueue->queue);
     583         114 :     tqueue->queue = NULL;
     584         114 : }
     585             : 
     586             : /*
     587             :  * Destroy receiver when done with it
     588             :  */
     589             : static void
     590         114 : tqueueDestroyReceiver(DestReceiver *self)
     591             : {
     592         114 :     TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
     593             : 
     594             :     /* We probably already detached from queue, but let's be sure */
     595         114 :     if (tqueue->queue != NULL)
     596           0 :         shm_mq_detach(tqueue->queue);
     597         114 :     if (tqueue->tmpcontext != NULL)
     598           1 :         MemoryContextDelete(tqueue->tmpcontext);
     599         114 :     if (tqueue->recordhtab != NULL)
     600           1 :         hash_destroy(tqueue->recordhtab);
     601             :     /* Is it worth trying to free substructure of the remap tree? */
     602         114 :     if (tqueue->field_remapinfo != NULL)
     603           1 :         pfree(tqueue->field_remapinfo);
     604         114 :     pfree(self);
     605         114 : }
     606             : 
     607             : /*
     608             :  * Create a DestReceiver that writes tuples to a tuple queue.
     609             :  */
     610             : DestReceiver *
     611         115 : CreateTupleQueueDestReceiver(shm_mq_handle *handle)
     612             : {
     613             :     TQueueDestReceiver *self;
     614             : 
     615         115 :     self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
     616             : 
     617         115 :     self->pub.receiveSlot = tqueueReceiveSlot;
     618         115 :     self->pub.rStartup = tqueueStartupReceiver;
     619         115 :     self->pub.rShutdown = tqueueShutdownReceiver;
     620         115 :     self->pub.rDestroy = tqueueDestroyReceiver;
     621         115 :     self->pub.mydest = DestTupleQueue;
     622         115 :     self->queue = handle;
     623         115 :     self->mycontext = CurrentMemoryContext;
     624         115 :     self->tmpcontext = NULL;
     625         115 :     self->recordhtab = NULL;
     626         115 :     self->mode = TUPLE_QUEUE_MODE_DATA;
     627             :     /* Top-level tupledesc is not known yet */
     628         115 :     self->tupledesc = NULL;
     629         115 :     self->field_remapinfo = NULL;
     630             : 
     631         115 :     return (DestReceiver *) self;
     632             : }
     633             : 
     634             : /*
     635             :  * Create a tuple queue reader.
     636             :  */
     637             : TupleQueueReader *
     638         115 : CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
     639             : {
     640         115 :     TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
     641             : 
     642         115 :     reader->queue = handle;
     643         115 :     reader->mycontext = CurrentMemoryContext;
     644         115 :     reader->typmodmap = NULL;
     645         115 :     reader->mode = TUPLE_QUEUE_MODE_DATA;
     646         115 :     reader->tupledesc = tupledesc;
     647         115 :     reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
     648             : 
     649         115 :     return reader;
     650             : }
     651             : 
     652             : /*
     653             :  * Destroy a tuple queue reader.
     654             :  *
     655             :  * Note: cleaning up the underlying shm_mq is the caller's responsibility.
     656             :  * We won't access it here, as it may be detached already.
     657             :  */
     658             : void
     659         114 : DestroyTupleQueueReader(TupleQueueReader *reader)
     660             : {
     661         114 :     if (reader->typmodmap != NULL)
     662           1 :         hash_destroy(reader->typmodmap);
     663             :     /* Is it worth trying to free substructure of the remap tree? */
     664         114 :     if (reader->field_remapinfo != NULL)
     665           1 :         pfree(reader->field_remapinfo);
     666         114 :     pfree(reader);
     667         114 : }
     668             : 
     669             : /*
     670             :  * Fetch a tuple from a tuple queue reader.
     671             :  *
     672             :  * The return value is NULL if there are no remaining tuples or if
     673             :  * nowait = true and no tuple is ready to return.  *done, if not NULL,
     674             :  * is set to true when there are no remaining tuples and otherwise to false.
     675             :  *
     676             :  * The returned tuple, if any, is allocated in CurrentMemoryContext.
     677             :  * That should be a short-lived (tuple-lifespan) context, because we are
     678             :  * pretty cavalier about leaking memory in that context if we have to do
     679             :  * tuple remapping.
     680             :  *
     681             :  * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
     682             :  * accumulate bytes from a partially-read message, so it's useful to call
     683             :  * this with nowait = true even if nothing is returned.
     684             :  */
     685             : HeapTuple
     686      433922 : TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
     687             : {
     688             :     shm_mq_result result;
     689             : 
     690      433922 :     if (done != NULL)
     691      433922 :         *done = false;
     692             : 
     693             :     for (;;)
     694             :     {
     695             :         Size        nbytes;
     696             :         void       *data;
     697             : 
     698             :         /* Attempt to read a message. */
     699      433937 :         result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
     700             : 
     701             :         /* If queue is detached, set *done and return NULL. */
     702      433937 :         if (result == SHM_MQ_DETACHED)
     703             :         {
     704         114 :             if (done != NULL)
     705         114 :                 *done = true;
     706      434036 :             return NULL;
     707             :         }
     708             : 
     709             :         /* In non-blocking mode, bail out if no message ready yet. */
     710      433823 :         if (result == SHM_MQ_WOULD_BLOCK)
     711      433758 :             return NULL;
     712          65 :         Assert(result == SHM_MQ_SUCCESS);
     713             : 
     714             :         /*
     715             :          * We got a message (see message spec at top of file).  Process it.
     716             :          */
     717          65 :         if (nbytes == 1)
     718             :         {
     719             :             /* Mode switch message. */
     720          10 :             reader->mode = ((char *) data)[0];
     721             :         }
     722          55 :         else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
     723             :         {
     724             :             /* Tuple data. */
     725          50 :             return TupleQueueHandleDataMessage(reader, nbytes, data);
     726             :         }
     727           5 :         else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
     728             :         {
     729             :             /* Control message, describing a transient record type. */
     730           5 :             TupleQueueHandleControlMessage(reader, nbytes, data);
     731             :         }
     732             :         else
     733           0 :             elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
     734          15 :     }
     735             : }
     736             : 
     737             : /*
     738             :  * Handle a data message - that is, a tuple - from the remote side.
     739             :  */
     740             : static HeapTuple
     741          50 : TupleQueueHandleDataMessage(TupleQueueReader *reader,
     742             :                             Size nbytes,
     743             :                             HeapTupleHeader data)
     744             : {
     745             :     HeapTupleData htup;
     746             : 
     747             :     /*
     748             :      * Set up a dummy HeapTupleData pointing to the data from the shm_mq
     749             :      * (which had better be sufficiently aligned).
     750             :      */
     751          50 :     ItemPointerSetInvalid(&htup.t_self);
     752          50 :     htup.t_tableOid = InvalidOid;
     753          50 :     htup.t_len = nbytes;
     754          50 :     htup.t_data = data;
     755             : 
     756             :     /*
     757             :      * Either just copy the data into a regular palloc'd tuple, or remap it,
     758             :      * as required.
     759             :      */
     760          50 :     return TQRemapTuple(reader,
     761             :                         reader->tupledesc,
     762             :                         reader->field_remapinfo,
     763             :                         &htup);
     764             : }
     765             : 
     766             : /*
     767             :  * Copy the given tuple, remapping any transient typmods contained in it.
     768             :  */
     769             : static HeapTuple
     770          50 : TQRemapTuple(TupleQueueReader *reader,
     771             :              TupleDesc tupledesc,
     772             :              TupleRemapInfo **field_remapinfo,
     773             :              HeapTuple tuple)
     774             : {
     775             :     Datum      *values;
     776             :     bool       *isnull;
     777          50 :     bool        changed = false;
     778             :     int         i;
     779             : 
     780             :     /*
     781             :      * If no remapping is necessary, just copy the tuple into a single
     782             :      * palloc'd chunk, as caller will expect.
     783             :      */
     784          50 :     if (field_remapinfo == NULL)
     785          45 :         return heap_copytuple(tuple);
     786             : 
     787             :     /* Deform tuple so we can remap record typmods for individual attrs. */
     788           5 :     values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
     789           5 :     isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
     790           5 :     heap_deform_tuple(tuple, tupledesc, values, isnull);
     791             : 
     792             :     /* Recursively process each interesting non-NULL attribute. */
     793          15 :     for (i = 0; i < tupledesc->natts; i++)
     794             :     {
     795          10 :         if (isnull[i] || field_remapinfo[i] == NULL)
     796           5 :             continue;
     797           5 :         values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
     798             :     }
     799             : 
     800             :     /* Reconstruct the modified tuple, if anything was modified. */
     801           5 :     if (changed)
     802           0 :         return heap_form_tuple(tupledesc, values, isnull);
     803             :     else
     804           5 :         return heap_copytuple(tuple);
     805             : }
     806             : 
     807             : /*
     808             :  * Process the given datum and replace any transient record typmods
     809             :  * contained in it.  Set *changed to TRUE if we actually changed the datum.
     810             :  *
     811             :  * remapinfo is previously-computed remapping info about the datum's type.
     812             :  *
     813             :  * This function just dispatches based on the remap class.
     814             :  */
     815             : static Datum
     816           5 : TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
     817             :         Datum value, bool *changed)
     818             : {
     819             :     /* This is recursive, so it could be driven to stack overflow. */
     820           5 :     check_stack_depth();
     821             : 
     822           5 :     switch (remapinfo->remapclass)
     823             :     {
     824             :         case TQUEUE_REMAP_ARRAY:
     825           0 :             return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
     826             : 
     827             :         case TQUEUE_REMAP_RANGE:
     828           0 :             return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
     829             : 
     830             :         case TQUEUE_REMAP_RECORD:
     831           5 :             return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
     832             :     }
     833             : 
     834           0 :     elog(ERROR, "unrecognized tqueue remap class: %d",
     835             :          (int) remapinfo->remapclass);
     836             :     return (Datum) 0;
     837             : }
     838             : 
     839             : /*
     840             :  * Process the given array datum and replace any transient record typmods
     841             :  * contained in it.  Set *changed to TRUE if we actually changed the datum.
     842             :  */
     843             : static Datum
     844           0 : TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
     845             :              Datum value, bool *changed)
     846             : {
     847           0 :     ArrayType  *arr = DatumGetArrayTypeP(value);
     848           0 :     Oid         typid = ARR_ELEMTYPE(arr);
     849           0 :     bool        element_changed = false;
     850             :     Datum      *elem_values;
     851             :     bool       *elem_nulls;
     852             :     int         num_elems;
     853             :     int         i;
     854             : 
     855             :     /* Deconstruct the array. */
     856           0 :     deconstruct_array(arr, typid, remapinfo->typlen,
     857           0 :                       remapinfo->typbyval, remapinfo->typalign,
     858             :                       &elem_values, &elem_nulls, &num_elems);
     859             : 
     860             :     /* Remap each element. */
     861           0 :     for (i = 0; i < num_elems; i++)
     862             :     {
     863           0 :         if (!elem_nulls[i])
     864           0 :             elem_values[i] = TQRemap(reader,
     865             :                                      remapinfo->element_remap,
     866           0 :                                      elem_values[i],
     867             :                                      &element_changed);
     868             :     }
     869             : 
     870           0 :     if (element_changed)
     871             :     {
     872             :         /* Reconstruct and return the array.  */
     873           0 :         *changed = true;
     874           0 :         arr = construct_md_array(elem_values, elem_nulls,
     875           0 :                                  ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
     876           0 :                                  typid, remapinfo->typlen,
     877           0 :                                  remapinfo->typbyval, remapinfo->typalign);
     878           0 :         return PointerGetDatum(arr);
     879             :     }
     880             : 
     881             :     /* Else just return the value as-is. */
     882           0 :     return value;
     883             : }
     884             : 
     885             : /*
     886             :  * Process the given range datum and replace any transient record typmods
     887             :  * contained in it.  Set *changed to TRUE if we actually changed the datum.
     888             :  */
     889             : static Datum
     890           0 : TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
     891             :              Datum value, bool *changed)
     892             : {
     893           0 :     RangeType  *range = DatumGetRangeType(value);
     894           0 :     bool        bound_changed = false;
     895             :     RangeBound  lower;
     896             :     RangeBound  upper;
     897             :     bool        empty;
     898             : 
     899             :     /* Extract the lower and upper bounds. */
     900           0 :     range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
     901             : 
     902             :     /* Nothing to do for an empty range. */
     903           0 :     if (empty)
     904           0 :         return value;
     905             : 
     906             :     /* Remap each bound, if present. */
     907           0 :     if (!upper.infinite)
     908           0 :         upper.val = TQRemap(reader, remapinfo->bound_remap,
     909             :                             upper.val, &bound_changed);
     910           0 :     if (!lower.infinite)
     911           0 :         lower.val = TQRemap(reader, remapinfo->bound_remap,
     912             :                             lower.val, &bound_changed);
     913             : 
     914           0 :     if (bound_changed)
     915             :     {
     916             :         /* Reserialize.  */
     917           0 :         *changed = true;
     918           0 :         range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
     919           0 :         return RangeTypeGetDatum(range);
     920             :     }
     921             : 
     922             :     /* Else just return the value as-is. */
     923           0 :     return value;
     924             : }
     925             : 
     926             : /*
     927             :  * Process the given record datum and replace any transient record typmods
     928             :  * contained in it.  Set *changed to TRUE if we actually changed the datum.
     929             :  */
     930             : static Datum
     931           5 : TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
     932             :               Datum value, bool *changed)
     933             : {
     934             :     HeapTupleHeader tup;
     935             :     Oid         typid;
     936             :     int32       typmod;
     937             :     bool        changed_typmod;
     938             :     TupleDesc   tupledesc;
     939             : 
     940             :     /* Extract type OID and typmod from tuple. */
     941           5 :     tup = DatumGetHeapTupleHeader(value);
     942           5 :     typid = HeapTupleHeaderGetTypeId(tup);
     943           5 :     typmod = HeapTupleHeaderGetTypMod(tup);
     944             : 
     945             :     /*
     946             :      * If first time through, or if this isn't the same composite type as last
     947             :      * time, identify the required typmod mapping, and then look up the
     948             :      * necessary information for processing the fields.
     949             :      */
     950           5 :     if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
     951             :     {
     952             :         /* Free any old data. */
     953           5 :         if (remapinfo->tupledesc != NULL)
     954           0 :             FreeTupleDesc(remapinfo->tupledesc);
     955             :         /* Is it worth trying to free substructure of the remap tree? */
     956           5 :         if (remapinfo->field_remap != NULL)
     957           0 :             pfree(remapinfo->field_remap);
     958             : 
     959             :         /* If transient record type, look up matching local typmod. */
     960           5 :         if (typid == RECORDOID)
     961             :         {
     962             :             RecordTypmodMap *mapent;
     963             : 
     964           5 :             Assert(reader->typmodmap != NULL);
     965           5 :             mapent = hash_search(reader->typmodmap, &typmod,
     966             :                                  HASH_FIND, NULL);
     967           5 :             if (mapent == NULL)
     968           0 :                 elog(ERROR, "tqueue received unrecognized remote typmod %d",
     969             :                      typmod);
     970           5 :             remapinfo->localtypmod = mapent->localtypmod;
     971             :         }
     972             :         else
     973           0 :             remapinfo->localtypmod = -1;
     974             : 
     975             :         /* Look up tuple descriptor in typcache. */
     976           5 :         tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
     977             : 
     978             :         /* Figure out whether fields need recursive processing. */
     979           5 :         remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
     980             :                                                      reader->mycontext);
     981           5 :         if (remapinfo->field_remap != NULL)
     982             :         {
     983             :             /*
     984             :              * We need to inspect the record contents, so save a copy of the
     985             :              * tupdesc.  (We could possibly just reference the typcache's
     986             :              * copy, but then it's problematic when to release the refcount.)
     987             :              */
     988           0 :             MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
     989             : 
     990           0 :             remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
     991           0 :             MemoryContextSwitchTo(oldcontext);
     992             :         }
     993             :         else
     994             :         {
     995             :             /* No fields of the record require remapping. */
     996           5 :             remapinfo->tupledesc = NULL;
     997             :         }
     998           5 :         remapinfo->rectypid = typid;
     999           5 :         remapinfo->rectypmod = typmod;
    1000             : 
    1001             :         /* Release reference count acquired by lookup_rowtype_tupdesc. */
    1002           5 :         DecrTupleDescRefCount(tupledesc);
    1003             :     }
    1004             : 
    1005             :     /* If transient record, replace remote typmod with local typmod. */
    1006           5 :     if (typid == RECORDOID && typmod != remapinfo->localtypmod)
    1007             :     {
    1008           0 :         typmod = remapinfo->localtypmod;
    1009           0 :         changed_typmod = true;
    1010             :     }
    1011             :     else
    1012           5 :         changed_typmod = false;
    1013             : 
    1014             :     /*
    1015             :      * If we need to change the typmod, or if there are any potentially
    1016             :      * remappable fields, replace the tuple.
    1017             :      */
    1018           5 :     if (changed_typmod || remapinfo->field_remap != NULL)
    1019             :     {
    1020             :         HeapTupleData htup;
    1021             :         HeapTuple   atup;
    1022             : 
    1023             :         /* For now, assume we always need to change the tuple in this case. */
    1024           0 :         *changed = true;
    1025             : 
    1026             :         /* Copy tuple, possibly remapping contained fields. */
    1027           0 :         ItemPointerSetInvalid(&htup.t_self);
    1028           0 :         htup.t_tableOid = InvalidOid;
    1029           0 :         htup.t_len = HeapTupleHeaderGetDatumLength(tup);
    1030           0 :         htup.t_data = tup;
    1031           0 :         atup = TQRemapTuple(reader,
    1032             :                             remapinfo->tupledesc,
    1033             :                             remapinfo->field_remap,
    1034             :                             &htup);
    1035             : 
    1036             :         /* Apply the correct labeling for a local Datum. */
    1037           0 :         HeapTupleHeaderSetTypeId(atup->t_data, typid);
    1038           0 :         HeapTupleHeaderSetTypMod(atup->t_data, typmod);
    1039           0 :         HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
    1040             : 
    1041             :         /* And return the results. */
    1042           0 :         return HeapTupleHeaderGetDatum(atup->t_data);
    1043             :     }
    1044             : 
    1045             :     /* Else just return the value as-is. */
    1046           5 :     return value;
    1047             : }
    1048             : 
    1049             : /*
    1050             :  * Handle a control message from the tuple queue reader.
    1051             :  *
    1052             :  * Control messages are sent when the remote side is sending tuples that
    1053             :  * contain transient record types.  We need to arrange to bless those
    1054             :  * record types locally and translate between remote and local typmods.
    1055             :  */
    1056             : static void
    1057           5 : TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
    1058             :                                char *data)
    1059             : {
    1060             :     int32       remotetypmod;
    1061             :     int         natts;
    1062             :     bool        hasoid;
    1063           5 :     Size        offset = 0;
    1064             :     Form_pg_attribute *attrs;
    1065             :     TupleDesc   tupledesc;
    1066             :     RecordTypmodMap *mapent;
    1067             :     bool        found;
    1068             :     int         i;
    1069             : 
    1070             :     /* Extract remote typmod. */
    1071           5 :     memcpy(&remotetypmod, &data[offset], sizeof(int32));
    1072           5 :     offset += sizeof(int32);
    1073             : 
    1074             :     /* Extract attribute count. */
    1075           5 :     memcpy(&natts, &data[offset], sizeof(int));
    1076           5 :     offset += sizeof(int);
    1077             : 
    1078             :     /* Extract hasoid flag. */
    1079           5 :     memcpy(&hasoid, &data[offset], sizeof(bool));
    1080           5 :     offset += sizeof(bool);
    1081             : 
    1082             :     /* Extract attribute details. The tupledesc made here is just transient. */
    1083           5 :     attrs = palloc(natts * sizeof(Form_pg_attribute));
    1084          20 :     for (i = 0; i < natts; i++)
    1085             :     {
    1086          15 :         attrs[i] = palloc(sizeof(FormData_pg_attribute));
    1087          15 :         memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
    1088          15 :         offset += sizeof(FormData_pg_attribute);
    1089             :     }
    1090             : 
    1091             :     /* We should have read the whole message. */
    1092           5 :     Assert(offset == nbytes);
    1093             : 
    1094             :     /* Construct TupleDesc, and assign a local typmod. */
    1095           5 :     tupledesc = CreateTupleDesc(natts, hasoid, attrs);
    1096           5 :     tupledesc = BlessTupleDesc(tupledesc);
    1097             : 
    1098             :     /* Create mapping hashtable if it doesn't exist already. */
    1099           5 :     if (reader->typmodmap == NULL)
    1100             :     {
    1101             :         HASHCTL     ctl;
    1102             : 
    1103           1 :         MemSet(&ctl, 0, sizeof(ctl));
    1104           1 :         ctl.keysize = sizeof(int32);
    1105           1 :         ctl.entrysize = sizeof(RecordTypmodMap);
    1106           1 :         ctl.hcxt = reader->mycontext;
    1107           1 :         reader->typmodmap = hash_create("tqueue receiver record type hashtable",
    1108             :                                         100, &ctl,
    1109             :                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
    1110             :     }
    1111             : 
    1112             :     /* Create map entry. */
    1113           5 :     mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
    1114             :                          &found);
    1115           5 :     if (found)
    1116           0 :         elog(ERROR, "duplicate tqueue control message for typmod %d",
    1117             :              remotetypmod);
    1118           5 :     mapent->localtypmod = tupledesc->tdtypmod;
    1119             : 
    1120           5 :     elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
    1121             :          remotetypmod, mapent->localtypmod);
    1122           5 : }
    1123             : 
    1124             : /*
    1125             :  * Build remap info for the specified data type, storing it in mycontext.
    1126             :  * Returns NULL if neither the type nor any subtype could require remapping.
    1127             :  */
    1128             : static TupleRemapInfo *
    1129         229 : BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
    1130             : {
    1131             :     HeapTuple   tup;
    1132             :     Form_pg_type typ;
    1133             : 
    1134             :     /* This is recursive, so it could be driven to stack overflow. */
    1135         229 :     check_stack_depth();
    1136             : 
    1137             : restart:
    1138         229 :     tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
    1139         229 :     if (!HeapTupleIsValid(tup))
    1140           0 :         elog(ERROR, "cache lookup failed for type %u", typid);
    1141         229 :     typ = (Form_pg_type) GETSTRUCT(tup);
    1142             : 
    1143             :     /* Look through domains to underlying base type. */
    1144         229 :     if (typ->typtype == TYPTYPE_DOMAIN)
    1145             :     {
    1146           0 :         typid = typ->typbasetype;
    1147           0 :         ReleaseSysCache(tup);
    1148           0 :         goto restart;
    1149             :     }
    1150             : 
    1151             :     /* If it's a true array type, deal with it that way. */
    1152         229 :     if (OidIsValid(typ->typelem) && typ->typlen == -1)
    1153             :     {
    1154           0 :         typid = typ->typelem;
    1155           0 :         ReleaseSysCache(tup);
    1156           0 :         return BuildArrayRemapInfo(typid, mycontext);
    1157             :     }
    1158             : 
    1159             :     /* Similarly, deal with ranges appropriately. */
    1160         229 :     if (typ->typtype == TYPTYPE_RANGE)
    1161             :     {
    1162           0 :         ReleaseSysCache(tup);
    1163           0 :         return BuildRangeRemapInfo(typid, mycontext);
    1164             :     }
    1165             : 
    1166             :     /*
    1167             :      * If it's a composite type (including RECORD), set up for remapping.  We
    1168             :      * don't attempt to determine the status of subfields here, since we do
    1169             :      * not have enough information yet; just mark everything invalid.
    1170             :      */
    1171         229 :     if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
    1172             :     {
    1173             :         TupleRemapInfo *remapinfo;
    1174             : 
    1175           2 :         remapinfo = (TupleRemapInfo *)
    1176             :             MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
    1177           2 :         remapinfo->remapclass = TQUEUE_REMAP_RECORD;
    1178           2 :         remapinfo->u.rec.rectypid = InvalidOid;
    1179           2 :         remapinfo->u.rec.rectypmod = -1;
    1180           2 :         remapinfo->u.rec.localtypmod = -1;
    1181           2 :         remapinfo->u.rec.tupledesc = NULL;
    1182           2 :         remapinfo->u.rec.field_remap = NULL;
    1183           2 :         ReleaseSysCache(tup);
    1184           2 :         return remapinfo;
    1185             :     }
    1186             : 
    1187             :     /* Nothing else can possibly need remapping attention. */
    1188         227 :     ReleaseSysCache(tup);
    1189         227 :     return NULL;
    1190             : }
    1191             : 
    1192             : static TupleRemapInfo *
    1193           0 : BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
    1194             : {
    1195             :     TupleRemapInfo *remapinfo;
    1196             :     TupleRemapInfo *element_remapinfo;
    1197             : 
    1198             :     /* See if element type requires remapping. */
    1199           0 :     element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
    1200             :     /* If not, the array doesn't either. */
    1201           0 :     if (element_remapinfo == NULL)
    1202           0 :         return NULL;
    1203             :     /* OK, set up to remap the array. */
    1204           0 :     remapinfo = (TupleRemapInfo *)
    1205             :         MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
    1206           0 :     remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
    1207           0 :     get_typlenbyvalalign(elemtypid,
    1208             :                          &remapinfo->u.arr.typlen,
    1209             :                          &remapinfo->u.arr.typbyval,
    1210             :                          &remapinfo->u.arr.typalign);
    1211           0 :     remapinfo->u.arr.element_remap = element_remapinfo;
    1212           0 :     return remapinfo;
    1213             : }
    1214             : 
    1215             : static TupleRemapInfo *
    1216           0 : BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
    1217             : {
    1218             :     TupleRemapInfo *remapinfo;
    1219             :     TupleRemapInfo *bound_remapinfo;
    1220             :     TypeCacheEntry *typcache;
    1221             : 
    1222             :     /*
    1223             :      * Get range info from the typcache.  We assume this pointer will stay
    1224             :      * valid for the duration of the query.
    1225             :      */
    1226           0 :     typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
    1227           0 :     if (typcache->rngelemtype == NULL)
    1228           0 :         elog(ERROR, "type %u is not a range type", rngtypid);
    1229             : 
    1230             :     /* See if range bound type requires remapping. */
    1231           0 :     bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
    1232             :                                           mycontext);
    1233             :     /* If not, the range doesn't either. */
    1234           0 :     if (bound_remapinfo == NULL)
    1235           0 :         return NULL;
    1236             :     /* OK, set up to remap the range. */
    1237           0 :     remapinfo = (TupleRemapInfo *)
    1238             :         MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
    1239           0 :     remapinfo->remapclass = TQUEUE_REMAP_RANGE;
    1240           0 :     remapinfo->u.rng.typcache = typcache;
    1241           0 :     remapinfo->u.rng.bound_remap = bound_remapinfo;
    1242           0 :     return remapinfo;
    1243             : }
    1244             : 
    1245             : /*
    1246             :  * Build remap info for fields of the type described by the given tupdesc.
    1247             :  * Returns an array of TupleRemapInfo pointers, or NULL if no field
    1248             :  * requires remapping.  Data is allocated in mycontext.
    1249             :  */
    1250             : static TupleRemapInfo **
    1251         171 : BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
    1252             : {
    1253             :     TupleRemapInfo **remapinfo;
    1254         171 :     bool        noop = true;
    1255             :     int         i;
    1256             : 
    1257             :     /* Recursively determine the remapping status of each field. */
    1258         171 :     remapinfo = (TupleRemapInfo **)
    1259         171 :         MemoryContextAlloc(mycontext,
    1260         171 :                            tupledesc->natts * sizeof(TupleRemapInfo *));
    1261         400 :     for (i = 0; i < tupledesc->natts; i++)
    1262             :     {
    1263         229 :         Form_pg_attribute attr = TupleDescAttr(tupledesc, i);
    1264             : 
    1265         229 :         if (attr->attisdropped)
    1266             :         {
    1267           0 :             remapinfo[i] = NULL;
    1268           0 :             continue;
    1269             :         }
    1270         229 :         remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
    1271         229 :         if (remapinfo[i] != NULL)
    1272           2 :             noop = false;
    1273             :     }
    1274             : 
    1275             :     /* If no fields require remapping, report that by returning NULL. */
    1276         171 :     if (noop)
    1277             :     {
    1278         169 :         pfree(remapinfo);
    1279         169 :         remapinfo = NULL;
    1280             :     }
    1281             : 
    1282         171 :     return remapinfo;
    1283             : }

Generated by: LCOV version 1.11