Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execCurrent.c
4 : * executor support for WHERE CURRENT OF cursor
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * src/backend/executor/execCurrent.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/sysattr.h"
16 : #include "catalog/pg_type.h"
17 : #include "executor/executor.h"
18 : #include "utils/builtins.h"
19 : #include "utils/lsyscache.h"
20 : #include "utils/portal.h"
21 : #include "utils/rel.h"
22 :
23 :
24 : static char *fetch_cursor_param_value(ExprContext *econtext, int paramId);
25 : static ScanState *search_plan_tree(PlanState *node, Oid table_oid);
26 :
27 :
28 : /*
29 : * execCurrentOf
30 : *
31 : * Given a CURRENT OF expression and the OID of a table, determine which row
32 : * of the table is currently being scanned by the cursor named by CURRENT OF,
33 : * and return the row's TID into *current_tid.
34 : *
35 : * Returns TRUE if a row was identified. Returns FALSE if the cursor is valid
36 : * for the table but is not currently scanning a row of the table (this is a
37 : * legal situation in inheritance cases). Raises error if cursor is not a
38 : * valid updatable scan of the specified table.
39 : */
40 : bool
41 56 : execCurrentOf(CurrentOfExpr *cexpr,
42 : ExprContext *econtext,
43 : Oid table_oid,
44 : ItemPointer current_tid)
45 : {
46 : char *cursor_name;
47 : char *table_name;
48 : Portal portal;
49 : QueryDesc *queryDesc;
50 :
51 : /* Get the cursor name --- may have to look up a parameter reference */
52 56 : if (cexpr->cursor_name)
53 36 : cursor_name = cexpr->cursor_name;
54 : else
55 20 : cursor_name = fetch_cursor_param_value(econtext, cexpr->cursor_param);
56 :
57 : /* Fetch table name for possible use in error messages */
58 56 : table_name = get_rel_name(table_oid);
59 56 : if (table_name == NULL)
60 0 : elog(ERROR, "cache lookup failed for relation %u", table_oid);
61 :
62 : /* Find the cursor's portal */
63 56 : portal = GetPortalByName(cursor_name);
64 56 : if (!PortalIsValid(portal))
65 1 : ereport(ERROR,
66 : (errcode(ERRCODE_UNDEFINED_CURSOR),
67 : errmsg("cursor \"%s\" does not exist", cursor_name)));
68 :
69 : /*
70 : * We have to watch out for non-SELECT queries as well as held cursors,
71 : * both of which may have null queryDesc.
72 : */
73 55 : if (portal->strategy != PORTAL_ONE_SELECT)
74 0 : ereport(ERROR,
75 : (errcode(ERRCODE_INVALID_CURSOR_STATE),
76 : errmsg("cursor \"%s\" is not a SELECT query",
77 : cursor_name)));
78 55 : queryDesc = PortalGetQueryDesc(portal);
79 55 : if (queryDesc == NULL || queryDesc->estate == NULL)
80 1 : ereport(ERROR,
81 : (errcode(ERRCODE_INVALID_CURSOR_STATE),
82 : errmsg("cursor \"%s\" is held from a previous transaction",
83 : cursor_name)));
84 :
85 : /*
86 : * We have two different strategies depending on whether the cursor uses
87 : * FOR UPDATE/SHARE or not. The reason for supporting both is that the
88 : * FOR UPDATE code is able to identify a target table in many cases where
89 : * the other code can't, while the non-FOR-UPDATE case allows use of WHERE
90 : * CURRENT OF with an insensitive cursor.
91 : */
92 54 : if (queryDesc->estate->es_rowMarks)
93 : {
94 : ExecRowMark *erm;
95 : ListCell *lc;
96 :
97 : /*
98 : * Here, the query must have exactly one FOR UPDATE/SHARE reference to
99 : * the target table, and we dig the ctid info out of that.
100 : */
101 16 : erm = NULL;
102 45 : foreach(lc, queryDesc->estate->es_rowMarks)
103 : {
104 30 : ExecRowMark *thiserm = (ExecRowMark *) lfirst(lc);
105 :
106 30 : if (!RowMarkRequiresRowShareLock(thiserm->markType))
107 4 : continue; /* ignore non-FOR UPDATE/SHARE items */
108 :
109 26 : if (thiserm->relid == table_oid)
110 : {
111 16 : if (erm)
112 1 : ereport(ERROR,
113 : (errcode(ERRCODE_INVALID_CURSOR_STATE),
114 : errmsg("cursor \"%s\" has multiple FOR UPDATE/SHARE references to table \"%s\"",
115 : cursor_name, table_name)));
116 15 : erm = thiserm;
117 : }
118 : }
119 :
120 15 : if (erm == NULL)
121 1 : ereport(ERROR,
122 : (errcode(ERRCODE_INVALID_CURSOR_STATE),
123 : errmsg("cursor \"%s\" does not have a FOR UPDATE/SHARE reference to table \"%s\"",
124 : cursor_name, table_name)));
125 :
126 : /*
127 : * The cursor must have a current result row: per the SQL spec, it's
128 : * an error if not.
129 : */
130 14 : if (portal->atStart || portal->atEnd)
131 0 : ereport(ERROR,
132 : (errcode(ERRCODE_INVALID_CURSOR_STATE),
133 : errmsg("cursor \"%s\" is not positioned on a row",
134 : cursor_name)));
135 :
136 : /* Return the currently scanned TID, if there is one */
137 14 : if (ItemPointerIsValid(&(erm->curCtid)))
138 : {
139 10 : *current_tid = erm->curCtid;
140 10 : return true;
141 : }
142 :
143 : /*
144 : * This table didn't produce the cursor's current row; some other
145 : * inheritance child of the same parent must have. Signal caller to
146 : * do nothing on this table.
147 : */
148 4 : return false;
149 : }
150 : else
151 : {
152 : ScanState *scanstate;
153 : bool lisnull;
154 : Oid tuple_tableoid PG_USED_FOR_ASSERTS_ONLY;
155 : ItemPointer tuple_tid;
156 :
157 : /*
158 : * Without FOR UPDATE, we dig through the cursor's plan to find the
159 : * scan node. Fail if it's not there or buried underneath
160 : * aggregation.
161 : */
162 38 : scanstate = search_plan_tree(queryDesc->planstate, table_oid);
163 38 : if (!scanstate)
164 4 : ereport(ERROR,
165 : (errcode(ERRCODE_INVALID_CURSOR_STATE),
166 : errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"",
167 : cursor_name, table_name)));
168 :
169 : /*
170 : * The cursor must have a current result row: per the SQL spec, it's
171 : * an error if not. We test this at the top level, rather than at the
172 : * scan node level, because in inheritance cases any one table scan
173 : * could easily not be on a row. We want to return false, not raise
174 : * error, if the passed-in table OID is for one of the inactive scans.
175 : */
176 34 : if (portal->atStart || portal->atEnd)
177 2 : ereport(ERROR,
178 : (errcode(ERRCODE_INVALID_CURSOR_STATE),
179 : errmsg("cursor \"%s\" is not positioned on a row",
180 : cursor_name)));
181 :
182 : /* Now OK to return false if we found an inactive scan */
183 32 : if (TupIsNull(scanstate->ss_ScanTupleSlot))
184 0 : return false;
185 :
186 : /* Use slot_getattr to catch any possible mistakes */
187 32 : tuple_tableoid =
188 32 : DatumGetObjectId(slot_getattr(scanstate->ss_ScanTupleSlot,
189 : TableOidAttributeNumber,
190 : &lisnull));
191 32 : Assert(!lisnull);
192 32 : tuple_tid = (ItemPointer)
193 32 : DatumGetPointer(slot_getattr(scanstate->ss_ScanTupleSlot,
194 : SelfItemPointerAttributeNumber,
195 : &lisnull));
196 32 : Assert(!lisnull);
197 :
198 32 : Assert(tuple_tableoid == table_oid);
199 :
200 32 : *current_tid = *tuple_tid;
201 :
202 32 : return true;
203 : }
204 : }
205 :
206 : /*
207 : * fetch_cursor_param_value
208 : *
209 : * Fetch the string value of a param, verifying it is of type REFCURSOR.
210 : */
211 : static char *
212 20 : fetch_cursor_param_value(ExprContext *econtext, int paramId)
213 : {
214 20 : ParamListInfo paramInfo = econtext->ecxt_param_list_info;
215 :
216 20 : if (paramInfo &&
217 20 : paramId > 0 && paramId <= paramInfo->numParams)
218 : {
219 20 : ParamExternData *prm = ¶mInfo->params[paramId - 1];
220 :
221 : /* give hook a chance in case parameter is dynamic */
222 20 : if (!OidIsValid(prm->ptype) && paramInfo->paramFetch != NULL)
223 0 : (*paramInfo->paramFetch) (paramInfo, paramId);
224 :
225 20 : if (OidIsValid(prm->ptype) && !prm->isnull)
226 : {
227 : /* safety check in case hook did something unexpected */
228 20 : if (prm->ptype != REFCURSOROID)
229 0 : ereport(ERROR,
230 : (errcode(ERRCODE_DATATYPE_MISMATCH),
231 : errmsg("type of parameter %d (%s) does not match that when preparing the plan (%s)",
232 : paramId,
233 : format_type_be(prm->ptype),
234 : format_type_be(REFCURSOROID))));
235 :
236 : /* We know that refcursor uses text's I/O routines */
237 20 : return TextDatumGetCString(prm->value);
238 : }
239 : }
240 :
241 0 : ereport(ERROR,
242 : (errcode(ERRCODE_UNDEFINED_OBJECT),
243 : errmsg("no value found for parameter %d", paramId)));
244 : return NULL;
245 : }
246 :
247 : /*
248 : * search_plan_tree
249 : *
250 : * Search through a PlanState tree for a scan node on the specified table.
251 : * Return NULL if not found or multiple candidates.
252 : */
253 : static ScanState *
254 40 : search_plan_tree(PlanState *node, Oid table_oid)
255 : {
256 40 : if (node == NULL)
257 0 : return NULL;
258 40 : switch (nodeTag(node))
259 : {
260 : /*
261 : * Relation scan nodes can all be treated alike
262 : */
263 : case T_SeqScanState:
264 : case T_SampleScanState:
265 : case T_IndexScanState:
266 : case T_IndexOnlyScanState:
267 : case T_BitmapHeapScanState:
268 : case T_TidScanState:
269 : case T_ForeignScanState:
270 : case T_CustomScanState:
271 : {
272 36 : ScanState *sstate = (ScanState *) node;
273 :
274 36 : if (RelationGetRelid(sstate->ss_currentRelation) == table_oid)
275 34 : return sstate;
276 2 : break;
277 : }
278 :
279 : /*
280 : * For Append, we must look through the members; watch out for
281 : * multiple matches (possible if it was from UNION ALL)
282 : */
283 : case T_AppendState:
284 : {
285 1 : AppendState *astate = (AppendState *) node;
286 1 : ScanState *result = NULL;
287 : int i;
288 :
289 3 : for (i = 0; i < astate->as_nplans; i++)
290 : {
291 2 : ScanState *elem = search_plan_tree(astate->appendplans[i],
292 : table_oid);
293 :
294 2 : if (!elem)
295 1 : continue;
296 1 : if (result)
297 0 : return NULL; /* multiple matches */
298 1 : result = elem;
299 : }
300 1 : return result;
301 : }
302 :
303 : /*
304 : * Similarly for MergeAppend
305 : */
306 : case T_MergeAppendState:
307 : {
308 0 : MergeAppendState *mstate = (MergeAppendState *) node;
309 0 : ScanState *result = NULL;
310 : int i;
311 :
312 0 : for (i = 0; i < mstate->ms_nplans; i++)
313 : {
314 0 : ScanState *elem = search_plan_tree(mstate->mergeplans[i],
315 : table_oid);
316 :
317 0 : if (!elem)
318 0 : continue;
319 0 : if (result)
320 0 : return NULL; /* multiple matches */
321 0 : result = elem;
322 : }
323 0 : return result;
324 : }
325 :
326 : /*
327 : * Result and Limit can be descended through (these are safe
328 : * because they always return their input's current row)
329 : */
330 : case T_ResultState:
331 : case T_LimitState:
332 0 : return search_plan_tree(node->lefttree, table_oid);
333 :
334 : /*
335 : * SubqueryScan too, but it keeps the child in a different place
336 : */
337 : case T_SubqueryScanState:
338 0 : return search_plan_tree(((SubqueryScanState *) node)->subplan,
339 : table_oid);
340 :
341 : default:
342 : /* Otherwise, assume we can't descend through it */
343 3 : break;
344 : }
345 5 : return NULL;
346 : }
|