LCOV - code coverage report
Current view: top level - src/backend/executor - nodeMergeAppend.c (source / functions) Hit Total Coverage
Test: PostgreSQL Lines: 85 86 98.8 %
Date: 2017-09-29 15:12:54 Functions: 5 5 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * nodeMergeAppend.c
       4             :  *    routines to handle MergeAppend nodes.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/executor/nodeMergeAppend.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : /* INTERFACE ROUTINES
      16             :  *      ExecInitMergeAppend     - initialize the MergeAppend node
      17             :  *      ExecMergeAppend         - retrieve the next tuple from the node
      18             :  *      ExecEndMergeAppend      - shut down the MergeAppend node
      19             :  *      ExecReScanMergeAppend   - rescan the MergeAppend node
      20             :  *
      21             :  *   NOTES
      22             :  *      A MergeAppend node contains a list of one or more subplans.
      23             :  *      These are each expected to deliver tuples that are sorted according
      24             :  *      to a common sort key.  The MergeAppend node merges these streams
      25             :  *      to produce output sorted the same way.
      26             :  *
      27             :  *      MergeAppend nodes don't make use of their left and right
      28             :  *      subtrees, rather they maintain a list of subplans so
      29             :  *      a typical MergeAppend node looks like this in the plan tree:
      30             :  *
      31             :  *                 ...
      32             :  *                 /
      33             :  *              MergeAppend---+------+------+--- nil
      34             :  *              /   \         |      |      |
      35             :  *            nil   nil      ...    ...    ...
      36             :  *                               subplans
      37             :  */
      38             : 
      39             : #include "postgres.h"
      40             : 
      41             : #include "executor/execdebug.h"
      42             : #include "executor/nodeMergeAppend.h"
      43             : #include "lib/binaryheap.h"
      44             : #include "miscadmin.h"
      45             : 
      46             : /*
      47             :  * We have one slot for each item in the heap array.  We use SlotNumber
      48             :  * to store slot indexes.  This doesn't actually provide any formal
      49             :  * type-safety, but it makes the code more self-documenting.
      50             :  */
      51             : typedef int32 SlotNumber;
      52             : 
      53             : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
      54             : static int  heap_compare_slots(Datum a, Datum b, void *arg);
      55             : 
      56             : 
      57             : /* ----------------------------------------------------------------
      58             :  *      ExecInitMergeAppend
      59             :  *
      60             :  *      Begin all of the subscans of the MergeAppend node.
      61             :  * ----------------------------------------------------------------
      62             :  */
      63             : MergeAppendState *
      64          34 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
      65             : {
      66          34 :     MergeAppendState *mergestate = makeNode(MergeAppendState);
      67             :     PlanState **mergeplanstates;
      68             :     int         nplans;
      69             :     int         i;
      70             :     ListCell   *lc;
      71             : 
      72             :     /* check for unsupported flags */
      73          34 :     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
      74             : 
      75             :     /*
      76             :      * Lock the non-leaf tables in the partition tree controlled by this node.
      77             :      * It's a no-op for non-partitioned parent tables.
      78             :      */
      79          34 :     ExecLockNonLeafAppendTables(node->partitioned_rels, estate);
      80             : 
      81             :     /*
      82             :      * Set up empty vector of subplan states
      83             :      */
      84          34 :     nplans = list_length(node->mergeplans);
      85             : 
      86          34 :     mergeplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
      87             : 
      88             :     /*
      89             :      * create new MergeAppendState for our node
      90             :      */
      91          34 :     mergestate->ps.plan = (Plan *) node;
      92          34 :     mergestate->ps.state = estate;
      93          34 :     mergestate->ps.ExecProcNode = ExecMergeAppend;
      94          34 :     mergestate->mergeplans = mergeplanstates;
      95          34 :     mergestate->ms_nplans = nplans;
      96             : 
      97          34 :     mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
      98          34 :     mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
      99             :                                               mergestate);
     100             : 
     101             :     /*
     102             :      * Miscellaneous initialization
     103             :      *
     104             :      * MergeAppend plans don't have expression contexts because they never
     105             :      * call ExecQual or ExecProject.
     106             :      */
     107             : 
     108             :     /*
     109             :      * MergeAppend nodes do have Result slots, which hold pointers to tuples,
     110             :      * so we have to initialize them.
     111             :      */
     112          34 :     ExecInitResultTupleSlot(estate, &mergestate->ps);
     113             : 
     114             :     /*
     115             :      * call ExecInitNode on each of the plans to be executed and save the
     116             :      * results into the array "mergeplans".
     117             :      */
     118          34 :     i = 0;
     119         134 :     foreach(lc, node->mergeplans)
     120             :     {
     121         100 :         Plan       *initNode = (Plan *) lfirst(lc);
     122             : 
     123         100 :         mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
     124         100 :         i++;
     125             :     }
     126             : 
     127             :     /*
     128             :      * initialize output tuple type
     129             :      */
     130          34 :     ExecAssignResultTypeFromTL(&mergestate->ps);
     131          34 :     mergestate->ps.ps_ProjInfo = NULL;
     132             : 
     133             :     /*
     134             :      * initialize sort-key information
     135             :      */
     136          34 :     mergestate->ms_nkeys = node->numCols;
     137          34 :     mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
     138             : 
     139          72 :     for (i = 0; i < node->numCols; i++)
     140             :     {
     141          38 :         SortSupport sortKey = mergestate->ms_sortkeys + i;
     142             : 
     143          38 :         sortKey->ssup_cxt = CurrentMemoryContext;
     144          38 :         sortKey->ssup_collation = node->collations[i];
     145          38 :         sortKey->ssup_nulls_first = node->nullsFirst[i];
     146          38 :         sortKey->ssup_attno = node->sortColIdx[i];
     147             : 
     148             :         /*
     149             :          * It isn't feasible to perform abbreviated key conversion, since
     150             :          * tuples are pulled into mergestate's binary heap as needed.  It
     151             :          * would likely be counter-productive to convert tuples into an
     152             :          * abbreviated representation as they're pulled up, so opt out of that
     153             :          * additional optimization entirely.
     154             :          */
     155          38 :         sortKey->abbreviate = false;
     156             : 
     157          38 :         PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
     158             :     }
     159             : 
     160             :     /*
     161             :      * initialize to show we have not run the subplans yet
     162             :      */
     163          34 :     mergestate->ms_initialized = false;
     164             : 
     165          34 :     return mergestate;
     166             : }
     167             : 
     168             : /* ----------------------------------------------------------------
     169             :  *     ExecMergeAppend
     170             :  *
     171             :  *      Handles iteration over multiple subplans.
     172             :  * ----------------------------------------------------------------
     173             :  */
     174             : static TupleTableSlot *
     175          66 : ExecMergeAppend(PlanState *pstate)
     176             : {
     177          66 :     MergeAppendState *node = castNode(MergeAppendState, pstate);
     178             :     TupleTableSlot *result;
     179             :     SlotNumber  i;
     180             : 
     181          66 :     CHECK_FOR_INTERRUPTS();
     182             : 
     183          66 :     if (!node->ms_initialized)
     184             :     {
     185             :         /*
     186             :          * First time through: pull the first tuple from each subplan, and set
     187             :          * up the heap.
     188             :          */
     189          58 :         for (i = 0; i < node->ms_nplans; i++)
     190             :         {
     191          44 :             node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
     192          44 :             if (!TupIsNull(node->ms_slots[i]))
     193          40 :                 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
     194             :         }
     195          14 :         binaryheap_build(node->ms_heap);
     196          14 :         node->ms_initialized = true;
     197             :     }
     198             :     else
     199             :     {
     200             :         /*
     201             :          * Otherwise, pull the next tuple from whichever subplan we returned
     202             :          * from last time, and reinsert the subplan index into the heap,
     203             :          * because it might now compare differently against the existing
     204             :          * elements of the heap.  (We could perhaps simplify the logic a bit
     205             :          * by doing this before returning from the prior call, but it's better
     206             :          * to not pull tuples until necessary.)
     207             :          */
     208          52 :         i = DatumGetInt32(binaryheap_first(node->ms_heap));
     209          52 :         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
     210          52 :         if (!TupIsNull(node->ms_slots[i]))
     211          43 :             binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
     212             :         else
     213           9 :             (void) binaryheap_remove_first(node->ms_heap);
     214             :     }
     215             : 
     216          66 :     if (binaryheap_empty(node->ms_heap))
     217             :     {
     218             :         /* All the subplans are exhausted, and so is the heap */
     219           3 :         result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
     220             :     }
     221             :     else
     222             :     {
     223          63 :         i = DatumGetInt32(binaryheap_first(node->ms_heap));
     224          63 :         result = node->ms_slots[i];
     225             :     }
     226             : 
     227          66 :     return result;
     228             : }
     229             : 
     230             : /*
     231             :  * Compare the tuples in the two given slots.
     232             :  */
     233             : static int32
     234          92 : heap_compare_slots(Datum a, Datum b, void *arg)
     235             : {
     236          92 :     MergeAppendState *node = (MergeAppendState *) arg;
     237          92 :     SlotNumber  slot1 = DatumGetInt32(a);
     238          92 :     SlotNumber  slot2 = DatumGetInt32(b);
     239             : 
     240          92 :     TupleTableSlot *s1 = node->ms_slots[slot1];
     241          92 :     TupleTableSlot *s2 = node->ms_slots[slot2];
     242             :     int         nkey;
     243             : 
     244          92 :     Assert(!TupIsNull(s1));
     245          92 :     Assert(!TupIsNull(s2));
     246             : 
     247          97 :     for (nkey = 0; nkey < node->ms_nkeys; nkey++)
     248             :     {
     249          92 :         SortSupport sortKey = node->ms_sortkeys + nkey;
     250          92 :         AttrNumber  attno = sortKey->ssup_attno;
     251             :         Datum       datum1,
     252             :                     datum2;
     253             :         bool        isNull1,
     254             :                     isNull2;
     255             :         int         compare;
     256             : 
     257          92 :         datum1 = slot_getattr(s1, attno, &isNull1);
     258          92 :         datum2 = slot_getattr(s2, attno, &isNull2);
     259             : 
     260          92 :         compare = ApplySortComparator(datum1, isNull1,
     261             :                                       datum2, isNull2,
     262             :                                       sortKey);
     263          92 :         if (compare != 0)
     264          87 :             return -compare;
     265             :     }
     266           5 :     return 0;
     267             : }
     268             : 
     269             : /* ----------------------------------------------------------------
     270             :  *      ExecEndMergeAppend
     271             :  *
     272             :  *      Shuts down the subscans of the MergeAppend node.
     273             :  *
     274             :  *      Returns nothing of interest.
     275             :  * ----------------------------------------------------------------
     276             :  */
     277             : void
     278          34 : ExecEndMergeAppend(MergeAppendState *node)
     279             : {
     280             :     PlanState **mergeplans;
     281             :     int         nplans;
     282             :     int         i;
     283             : 
     284             :     /*
     285             :      * get information from the node
     286             :      */
     287          34 :     mergeplans = node->mergeplans;
     288          34 :     nplans = node->ms_nplans;
     289             : 
     290             :     /*
     291             :      * shut down each of the subscans
     292             :      */
     293         134 :     for (i = 0; i < nplans; i++)
     294         100 :         ExecEndNode(mergeplans[i]);
     295          34 : }
     296             : 
     297             : void
     298           3 : ExecReScanMergeAppend(MergeAppendState *node)
     299             : {
     300             :     int         i;
     301             : 
     302           9 :     for (i = 0; i < node->ms_nplans; i++)
     303             :     {
     304           6 :         PlanState  *subnode = node->mergeplans[i];
     305             : 
     306             :         /*
     307             :          * ExecReScan doesn't know about my subplans, so I have to do
     308             :          * changed-parameter signaling myself.
     309             :          */
     310           6 :         if (node->ps.chgParam != NULL)
     311           6 :             UpdateChangedParamSet(subnode, node->ps.chgParam);
     312             : 
     313             :         /*
     314             :          * If chgParam of subnode is not null then plan will be re-scanned by
     315             :          * first ExecProcNode.
     316             :          */
     317           6 :         if (subnode->chgParam == NULL)
     318           0 :             ExecReScan(subnode);
     319             :     }
     320           3 :     binaryheap_reset(node->ms_heap);
     321           3 :     node->ms_initialized = false;
     322           3 : }

Generated by: LCOV version 1.11