Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeForeignscan.c
4 : * Routines to support scans of foreign tables
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeForeignscan.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /*
16 : * INTERFACE ROUTINES
17 : *
18 : * ExecForeignScan scans a foreign table.
19 : * ExecInitForeignScan creates and initializes state info.
20 : * ExecReScanForeignScan rescans the foreign relation.
21 : * ExecEndForeignScan releases any resources allocated.
22 : */
23 : #include "postgres.h"
24 :
25 : #include "executor/executor.h"
26 : #include "executor/nodeForeignscan.h"
27 : #include "foreign/fdwapi.h"
28 : #include "utils/memutils.h"
29 : #include "utils/rel.h"
30 :
31 : static TupleTableSlot *ForeignNext(ForeignScanState *node);
32 : static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33 :
34 :
35 : /* ----------------------------------------------------------------
36 : * ForeignNext
37 : *
38 : * This is a workhorse for ExecForeignScan
39 : * ----------------------------------------------------------------
40 : */
41 : static TupleTableSlot *
42 0 : ForeignNext(ForeignScanState *node)
43 : {
44 : TupleTableSlot *slot;
45 0 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
46 0 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 : MemoryContext oldcontext;
48 :
49 : /* Call the Iterate function in short-lived context */
50 0 : oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
51 0 : if (plan->operation != CMD_SELECT)
52 0 : slot = node->fdwroutine->IterateDirectModify(node);
53 : else
54 0 : slot = node->fdwroutine->IterateForeignScan(node);
55 0 : MemoryContextSwitchTo(oldcontext);
56 :
57 : /*
58 : * If any system columns are requested, we have to force the tuple into
59 : * physical-tuple form to avoid "cannot extract system attribute from
60 : * virtual tuple" errors later. We also insert a valid value for
61 : * tableoid, which is the only actually-useful system column.
62 : */
63 0 : if (plan->fsSystemCol && !TupIsNull(slot))
64 : {
65 0 : HeapTuple tup = ExecMaterializeSlot(slot);
66 :
67 0 : tup->t_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
68 : }
69 :
70 0 : return slot;
71 : }
72 :
73 : /*
74 : * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
75 : */
76 : static bool
77 0 : ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
78 : {
79 0 : FdwRoutine *fdwroutine = node->fdwroutine;
80 : ExprContext *econtext;
81 :
82 : /*
83 : * extract necessary information from foreign scan node
84 : */
85 0 : econtext = node->ss.ps.ps_ExprContext;
86 :
87 : /* Does the tuple meet the remote qual condition? */
88 0 : econtext->ecxt_scantuple = slot;
89 :
90 0 : ResetExprContext(econtext);
91 :
92 : /*
93 : * If an outer join is pushed down, RecheckForeignScan may need to store a
94 : * different tuple in the slot, because a different set of columns may go
95 : * to NULL upon recheck. Otherwise, it shouldn't need to change the slot
96 : * contents, just return true or false to indicate whether the quals still
97 : * pass. For simple cases, setting fdw_recheck_quals may be easier than
98 : * providing this callback.
99 : */
100 0 : if (fdwroutine->RecheckForeignScan &&
101 0 : !fdwroutine->RecheckForeignScan(node, slot))
102 0 : return false;
103 :
104 0 : return ExecQual(node->fdw_recheck_quals, econtext);
105 : }
106 :
107 : /* ----------------------------------------------------------------
108 : * ExecForeignScan(node)
109 : *
110 : * Fetches the next tuple from the FDW, checks local quals, and
111 : * returns it.
112 : * We call the ExecScan() routine and pass it the appropriate
113 : * access method functions.
114 : * ----------------------------------------------------------------
115 : */
116 : static TupleTableSlot *
117 0 : ExecForeignScan(PlanState *pstate)
118 : {
119 0 : ForeignScanState *node = castNode(ForeignScanState, pstate);
120 :
121 0 : return ExecScan(&node->ss,
122 : (ExecScanAccessMtd) ForeignNext,
123 : (ExecScanRecheckMtd) ForeignRecheck);
124 : }
125 :
126 :
127 : /* ----------------------------------------------------------------
128 : * ExecInitForeignScan
129 : * ----------------------------------------------------------------
130 : */
131 : ForeignScanState *
132 0 : ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
133 : {
134 : ForeignScanState *scanstate;
135 0 : Relation currentRelation = NULL;
136 0 : Index scanrelid = node->scan.scanrelid;
137 : Index tlistvarno;
138 : FdwRoutine *fdwroutine;
139 :
140 : /* check for unsupported flags */
141 0 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
142 :
143 : /*
144 : * create state structure
145 : */
146 0 : scanstate = makeNode(ForeignScanState);
147 0 : scanstate->ss.ps.plan = (Plan *) node;
148 0 : scanstate->ss.ps.state = estate;
149 0 : scanstate->ss.ps.ExecProcNode = ExecForeignScan;
150 :
151 : /*
152 : * Miscellaneous initialization
153 : *
154 : * create expression context for node
155 : */
156 0 : ExecAssignExprContext(estate, &scanstate->ss.ps);
157 :
158 : /*
159 : * initialize child expressions
160 : */
161 0 : scanstate->ss.ps.qual =
162 0 : ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
163 0 : scanstate->fdw_recheck_quals =
164 0 : ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
165 :
166 : /*
167 : * tuple table initialization
168 : */
169 0 : ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
170 0 : ExecInitScanTupleSlot(estate, &scanstate->ss);
171 :
172 : /*
173 : * open the base relation, if any, and acquire an appropriate lock on it;
174 : * also acquire function pointers from the FDW's handler
175 : */
176 0 : if (scanrelid > 0)
177 : {
178 0 : currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
179 0 : scanstate->ss.ss_currentRelation = currentRelation;
180 0 : fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
181 : }
182 : else
183 : {
184 : /* We can't use the relcache, so get fdwroutine the hard way */
185 0 : fdwroutine = GetFdwRoutineByServerId(node->fs_server);
186 : }
187 :
188 : /*
189 : * Determine the scan tuple type. If the FDW provided a targetlist
190 : * describing the scan tuples, use that; else use base relation's rowtype.
191 : */
192 0 : if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
193 0 : {
194 : TupleDesc scan_tupdesc;
195 :
196 0 : scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist, false);
197 0 : ExecAssignScanType(&scanstate->ss, scan_tupdesc);
198 : /* Node's targetlist will contain Vars with varno = INDEX_VAR */
199 0 : tlistvarno = INDEX_VAR;
200 : }
201 : else
202 : {
203 0 : ExecAssignScanType(&scanstate->ss, RelationGetDescr(currentRelation));
204 : /* Node's targetlist will contain Vars with varno = scanrelid */
205 0 : tlistvarno = scanrelid;
206 : }
207 :
208 : /*
209 : * Initialize result tuple type and projection info.
210 : */
211 0 : ExecAssignResultTypeFromTL(&scanstate->ss.ps);
212 0 : ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
213 :
214 : /*
215 : * Initialize FDW-related state.
216 : */
217 0 : scanstate->fdwroutine = fdwroutine;
218 0 : scanstate->fdw_state = NULL;
219 :
220 : /* Initialize any outer plan. */
221 0 : if (outerPlan(node))
222 0 : outerPlanState(scanstate) =
223 0 : ExecInitNode(outerPlan(node), estate, eflags);
224 :
225 : /*
226 : * Tell the FDW to initialize the scan.
227 : */
228 0 : if (node->operation != CMD_SELECT)
229 0 : fdwroutine->BeginDirectModify(scanstate, eflags);
230 : else
231 0 : fdwroutine->BeginForeignScan(scanstate, eflags);
232 :
233 0 : return scanstate;
234 : }
235 :
236 : /* ----------------------------------------------------------------
237 : * ExecEndForeignScan
238 : *
239 : * frees any storage allocated through C routines.
240 : * ----------------------------------------------------------------
241 : */
242 : void
243 0 : ExecEndForeignScan(ForeignScanState *node)
244 : {
245 0 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
246 :
247 : /* Let the FDW shut down */
248 0 : if (plan->operation != CMD_SELECT)
249 0 : node->fdwroutine->EndDirectModify(node);
250 : else
251 0 : node->fdwroutine->EndForeignScan(node);
252 :
253 : /* Shut down any outer plan. */
254 0 : if (outerPlanState(node))
255 0 : ExecEndNode(outerPlanState(node));
256 :
257 : /* Free the exprcontext */
258 0 : ExecFreeExprContext(&node->ss.ps);
259 :
260 : /* clean out the tuple table */
261 0 : ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
262 0 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
263 :
264 : /* close the relation. */
265 0 : if (node->ss.ss_currentRelation)
266 0 : ExecCloseScanRelation(node->ss.ss_currentRelation);
267 0 : }
268 :
269 : /* ----------------------------------------------------------------
270 : * ExecReScanForeignScan
271 : *
272 : * Rescans the relation.
273 : * ----------------------------------------------------------------
274 : */
275 : void
276 0 : ExecReScanForeignScan(ForeignScanState *node)
277 : {
278 0 : PlanState *outerPlan = outerPlanState(node);
279 :
280 0 : node->fdwroutine->ReScanForeignScan(node);
281 :
282 : /*
283 : * If chgParam of subnode is not null then plan will be re-scanned by
284 : * first ExecProcNode. outerPlan may also be NULL, in which case there is
285 : * nothing to rescan at all.
286 : */
287 0 : if (outerPlan != NULL && outerPlan->chgParam == NULL)
288 0 : ExecReScan(outerPlan);
289 :
290 0 : ExecScanReScan(&node->ss);
291 0 : }
292 :
293 : /* ----------------------------------------------------------------
294 : * ExecForeignScanEstimate
295 : *
296 : * Informs size of the parallel coordination information, if any
297 : * ----------------------------------------------------------------
298 : */
299 : void
300 0 : ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
301 : {
302 0 : FdwRoutine *fdwroutine = node->fdwroutine;
303 :
304 0 : if (fdwroutine->EstimateDSMForeignScan)
305 : {
306 0 : node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
307 0 : shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
308 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
309 : }
310 0 : }
311 :
312 : /* ----------------------------------------------------------------
313 : * ExecForeignScanInitializeDSM
314 : *
315 : * Initialize the parallel coordination information
316 : * ----------------------------------------------------------------
317 : */
318 : void
319 0 : ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
320 : {
321 0 : FdwRoutine *fdwroutine = node->fdwroutine;
322 :
323 0 : if (fdwroutine->InitializeDSMForeignScan)
324 : {
325 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
326 : void *coordinate;
327 :
328 0 : coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
329 0 : fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
330 0 : shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
331 : }
332 0 : }
333 :
334 : /* ----------------------------------------------------------------
335 : * ExecForeignScanReInitializeDSM
336 : *
337 : * Reset shared state before beginning a fresh scan.
338 : * ----------------------------------------------------------------
339 : */
340 : void
341 0 : ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
342 : {
343 0 : FdwRoutine *fdwroutine = node->fdwroutine;
344 :
345 0 : if (fdwroutine->ReInitializeDSMForeignScan)
346 : {
347 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
348 : void *coordinate;
349 :
350 0 : coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
351 0 : fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
352 : }
353 0 : }
354 :
355 : /* ----------------------------------------------------------------
356 : * ExecForeignScanInitializeWorker
357 : *
358 : * Initialization according to the parallel coordination information
359 : * ----------------------------------------------------------------
360 : */
361 : void
362 0 : ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
363 : {
364 0 : FdwRoutine *fdwroutine = node->fdwroutine;
365 :
366 0 : if (fdwroutine->InitializeWorkerForeignScan)
367 : {
368 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
369 : void *coordinate;
370 :
371 0 : coordinate = shm_toc_lookup(toc, plan_node_id, false);
372 0 : fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
373 : }
374 0 : }
375 :
376 : /* ----------------------------------------------------------------
377 : * ExecShutdownForeignScan
378 : *
379 : * Gives FDW chance to stop asynchronous resource consumption
380 : * and release any resources still held.
381 : * ----------------------------------------------------------------
382 : */
383 : void
384 0 : ExecShutdownForeignScan(ForeignScanState *node)
385 : {
386 0 : FdwRoutine *fdwroutine = node->fdwroutine;
387 :
388 0 : if (fdwroutine->ShutdownForeignScan)
389 0 : fdwroutine->ShutdownForeignScan(node);
390 0 : }
|