Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pquery.c
4 : * POSTGRES process query command code
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/tcop/pquery.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include <limits.h>
19 :
20 : #include "access/xact.h"
21 : #include "commands/prepare.h"
22 : #include "executor/tstoreReceiver.h"
23 : #include "miscadmin.h"
24 : #include "pg_trace.h"
25 : #include "tcop/pquery.h"
26 : #include "tcop/utility.h"
27 : #include "utils/memutils.h"
28 : #include "utils/snapmgr.h"
29 :
30 :
31 : /*
32 : * ActivePortal is the currently executing Portal (the most closely nested,
33 : * if there are several).
34 : */
35 : Portal ActivePortal = NULL;
36 :
37 :
38 : static void ProcessQuery(PlannedStmt *plan,
39 : const char *sourceText,
40 : ParamListInfo params,
41 : QueryEnvironment *queryEnv,
42 : DestReceiver *dest,
43 : char *completionTag);
44 : static void FillPortalStore(Portal portal, bool isTopLevel);
45 : static uint64 RunFromStore(Portal portal, ScanDirection direction, uint64 count,
46 : DestReceiver *dest);
47 : static uint64 PortalRunSelect(Portal portal, bool forward, long count,
48 : DestReceiver *dest);
49 : static void PortalRunUtility(Portal portal, PlannedStmt *pstmt,
50 : bool isTopLevel, bool setHoldSnapshot,
51 : DestReceiver *dest, char *completionTag);
52 : static void PortalRunMulti(Portal portal,
53 : bool isTopLevel, bool setHoldSnapshot,
54 : DestReceiver *dest, DestReceiver *altdest,
55 : char *completionTag);
56 : static uint64 DoPortalRunFetch(Portal portal,
57 : FetchDirection fdirection,
58 : long count,
59 : DestReceiver *dest);
60 : static void DoPortalRewind(Portal portal);
61 :
62 :
63 : /*
64 : * CreateQueryDesc
65 : */
66 : QueryDesc *
67 24647 : CreateQueryDesc(PlannedStmt *plannedstmt,
68 : const char *sourceText,
69 : Snapshot snapshot,
70 : Snapshot crosscheck_snapshot,
71 : DestReceiver *dest,
72 : ParamListInfo params,
73 : QueryEnvironment *queryEnv,
74 : int instrument_options)
75 : {
76 24647 : QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
77 :
78 24647 : qd->operation = plannedstmt->commandType; /* operation */
79 24647 : qd->plannedstmt = plannedstmt; /* plan */
80 24647 : qd->sourceText = sourceText; /* query text */
81 24647 : qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */
82 : /* RI check snapshot */
83 24647 : qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
84 24647 : qd->dest = dest; /* output dest */
85 24647 : qd->params = params; /* parameter values passed into query */
86 24647 : qd->queryEnv = queryEnv;
87 24647 : qd->instrument_options = instrument_options; /* instrumentation wanted? */
88 :
89 : /* null these fields until set by ExecutorStart */
90 24647 : qd->tupDesc = NULL;
91 24647 : qd->estate = NULL;
92 24647 : qd->planstate = NULL;
93 24647 : qd->totaltime = NULL;
94 :
95 : /* not yet executed */
96 24647 : qd->already_executed = false;
97 :
98 24647 : return qd;
99 : }
100 :
101 : /*
102 : * FreeQueryDesc
103 : */
104 : void
105 21298 : FreeQueryDesc(QueryDesc *qdesc)
106 : {
107 : /* Can't be a live query */
108 21298 : Assert(qdesc->estate == NULL);
109 :
110 : /* forget our snapshots */
111 21298 : UnregisterSnapshot(qdesc->snapshot);
112 21298 : UnregisterSnapshot(qdesc->crosscheck_snapshot);
113 :
114 : /* Only the QueryDesc itself need be freed */
115 21298 : pfree(qdesc);
116 21298 : }
117 :
118 :
119 : /*
120 : * ProcessQuery
121 : * Execute a single plannable query within a PORTAL_MULTI_QUERY,
122 : * PORTAL_ONE_RETURNING, or PORTAL_ONE_MOD_WITH portal
123 : *
124 : * plan: the plan tree for the query
125 : * sourceText: the source text of the query
126 : * params: any parameters needed
127 : * dest: where to send results
128 : * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
129 : * in which to store a command completion status string.
130 : *
131 : * completionTag may be NULL if caller doesn't want a status string.
132 : *
133 : * Must be called in a memory context that will be reset or deleted on
134 : * error; otherwise the executor's memory usage will be leaked.
135 : */
136 : static void
137 3991 : ProcessQuery(PlannedStmt *plan,
138 : const char *sourceText,
139 : ParamListInfo params,
140 : QueryEnvironment *queryEnv,
141 : DestReceiver *dest,
142 : char *completionTag)
143 : {
144 : QueryDesc *queryDesc;
145 :
146 : /*
147 : * Create the QueryDesc object
148 : */
149 3991 : queryDesc = CreateQueryDesc(plan, sourceText,
150 : GetActiveSnapshot(), InvalidSnapshot,
151 : dest, params, queryEnv, 0);
152 :
153 : /*
154 : * Call ExecutorStart to prepare the plan for execution
155 : */
156 3991 : ExecutorStart(queryDesc, 0);
157 :
158 : /*
159 : * Run the plan to completion.
160 : */
161 3954 : ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
162 :
163 : /*
164 : * Build command completion status string, if caller wants one.
165 : */
166 3707 : if (completionTag)
167 : {
168 : Oid lastOid;
169 :
170 3619 : switch (queryDesc->operation)
171 : {
172 : case CMD_SELECT:
173 17 : snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
174 : "SELECT " UINT64_FORMAT,
175 17 : queryDesc->estate->es_processed);
176 17 : break;
177 : case CMD_INSERT:
178 3008 : if (queryDesc->estate->es_processed == 1)
179 2606 : lastOid = queryDesc->estate->es_lastoid;
180 : else
181 402 : lastOid = InvalidOid;
182 3008 : snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
183 : "INSERT %u " UINT64_FORMAT,
184 3008 : lastOid, queryDesc->estate->es_processed);
185 3008 : break;
186 : case CMD_UPDATE:
187 364 : snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
188 : "UPDATE " UINT64_FORMAT,
189 364 : queryDesc->estate->es_processed);
190 364 : break;
191 : case CMD_DELETE:
192 230 : snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
193 : "DELETE " UINT64_FORMAT,
194 230 : queryDesc->estate->es_processed);
195 230 : break;
196 : default:
197 0 : strcpy(completionTag, "???");
198 0 : break;
199 : }
200 : }
201 :
202 : /*
203 : * Now, we close down all the scans and free allocated resources.
204 : */
205 3707 : ExecutorFinish(queryDesc);
206 3640 : ExecutorEnd(queryDesc);
207 :
208 3640 : FreeQueryDesc(queryDesc);
209 3640 : }
210 :
211 : /*
212 : * ChoosePortalStrategy
213 : * Select portal execution strategy given the intended statement list.
214 : *
215 : * The list elements can be Querys or PlannedStmts.
216 : * That's more general than portals need, but plancache.c uses this too.
217 : *
218 : * See the comments in portal.h.
219 : */
220 : PortalStrategy
221 30775 : ChoosePortalStrategy(List *stmts)
222 : {
223 : int nSetTag;
224 : ListCell *lc;
225 :
226 : /*
227 : * PORTAL_ONE_SELECT and PORTAL_UTIL_SELECT need only consider the
228 : * single-statement case, since there are no rewrite rules that can add
229 : * auxiliary queries to a SELECT or a utility command. PORTAL_ONE_MOD_WITH
230 : * likewise allows only one top-level statement.
231 : */
232 30775 : if (list_length(stmts) == 1)
233 : {
234 30733 : Node *stmt = (Node *) linitial(stmts);
235 :
236 30733 : if (IsA(stmt, Query))
237 : {
238 3474 : Query *query = (Query *) stmt;
239 :
240 3474 : if (query->canSetTag)
241 : {
242 3474 : if (query->commandType == CMD_SELECT)
243 : {
244 2780 : if (query->hasModifyingCTE)
245 0 : return PORTAL_ONE_MOD_WITH;
246 : else
247 2780 : return PORTAL_ONE_SELECT;
248 : }
249 694 : if (query->commandType == CMD_UTILITY)
250 : {
251 572 : if (UtilityReturnsTuples(query->utilityStmt))
252 501 : return PORTAL_UTIL_SELECT;
253 : /* it can't be ONE_RETURNING, so give up */
254 71 : return PORTAL_MULTI_QUERY;
255 : }
256 : }
257 : }
258 27259 : else if (IsA(stmt, PlannedStmt))
259 : {
260 27259 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
261 :
262 27259 : if (pstmt->canSetTag)
263 : {
264 27258 : if (pstmt->commandType == CMD_SELECT)
265 : {
266 10987 : if (pstmt->hasModifyingCTE)
267 19 : return PORTAL_ONE_MOD_WITH;
268 : else
269 10968 : return PORTAL_ONE_SELECT;
270 : }
271 16271 : if (pstmt->commandType == CMD_UTILITY)
272 : {
273 12428 : if (UtilityReturnsTuples(pstmt->utilityStmt))
274 1507 : return PORTAL_UTIL_SELECT;
275 : /* it can't be ONE_RETURNING, so give up */
276 10921 : return PORTAL_MULTI_QUERY;
277 : }
278 : }
279 : }
280 : else
281 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
282 : }
283 :
284 : /*
285 : * PORTAL_ONE_RETURNING has to allow auxiliary queries added by rewrite.
286 : * Choose PORTAL_ONE_RETURNING if there is exactly one canSetTag query and
287 : * it has a RETURNING list.
288 : */
289 4008 : nSetTag = 0;
290 4179 : foreach(lc, stmts)
291 : {
292 4034 : Node *stmt = (Node *) lfirst(lc);
293 :
294 4034 : if (IsA(stmt, Query))
295 : {
296 122 : Query *query = (Query *) stmt;
297 :
298 122 : if (query->canSetTag)
299 : {
300 122 : if (++nSetTag > 1)
301 0 : return PORTAL_MULTI_QUERY; /* no need to look further */
302 244 : if (query->commandType == CMD_UTILITY ||
303 122 : query->returningList == NIL)
304 116 : return PORTAL_MULTI_QUERY; /* no need to look further */
305 : }
306 : }
307 3912 : else if (IsA(stmt, PlannedStmt))
308 : {
309 3912 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
310 :
311 3912 : if (pstmt->canSetTag)
312 : {
313 3884 : if (++nSetTag > 1)
314 0 : return PORTAL_MULTI_QUERY; /* no need to look further */
315 7768 : if (pstmt->commandType == CMD_UTILITY ||
316 3884 : !pstmt->hasReturning)
317 3747 : return PORTAL_MULTI_QUERY; /* no need to look further */
318 : }
319 : }
320 : else
321 0 : elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
322 : }
323 145 : if (nSetTag == 1)
324 143 : return PORTAL_ONE_RETURNING;
325 :
326 : /* Else, it's the general case... */
327 2 : return PORTAL_MULTI_QUERY;
328 : }
329 :
330 : /*
331 : * FetchPortalTargetList
332 : * Given a portal that returns tuples, extract the query targetlist.
333 : * Returns NIL if the portal doesn't have a determinable targetlist.
334 : *
335 : * Note: do not modify the result.
336 : */
337 : List *
338 11777 : FetchPortalTargetList(Portal portal)
339 : {
340 : /* no point in looking if we determined it doesn't return tuples */
341 11777 : if (portal->strategy == PORTAL_MULTI_QUERY)
342 1 : return NIL;
343 : /* get the primary statement and find out what it returns */
344 11776 : return FetchStatementTargetList((Node *) PortalGetPrimaryStmt(portal));
345 : }
346 :
347 : /*
348 : * FetchStatementTargetList
349 : * Given a statement that returns tuples, extract the query targetlist.
350 : * Returns NIL if the statement doesn't have a determinable targetlist.
351 : *
352 : * This can be applied to a Query or a PlannedStmt.
353 : * That's more general than portals need, but plancache.c uses this too.
354 : *
355 : * Note: do not modify the result.
356 : *
357 : * XXX be careful to keep this in sync with UtilityReturnsTuples.
358 : */
359 : List *
360 11894 : FetchStatementTargetList(Node *stmt)
361 : {
362 11894 : if (stmt == NULL)
363 0 : return NIL;
364 11894 : if (IsA(stmt, Query))
365 : {
366 118 : Query *query = (Query *) stmt;
367 :
368 118 : if (query->commandType == CMD_UTILITY)
369 : {
370 : /* transfer attention to utility statement */
371 0 : stmt = query->utilityStmt;
372 : }
373 : else
374 : {
375 118 : if (query->commandType == CMD_SELECT)
376 118 : return query->targetList;
377 0 : if (query->returningList)
378 0 : return query->returningList;
379 0 : return NIL;
380 : }
381 : }
382 11776 : if (IsA(stmt, PlannedStmt))
383 : {
384 11776 : PlannedStmt *pstmt = (PlannedStmt *) stmt;
385 :
386 11776 : if (pstmt->commandType == CMD_UTILITY)
387 : {
388 : /* transfer attention to utility statement */
389 983 : stmt = pstmt->utilityStmt;
390 : }
391 : else
392 : {
393 10793 : if (pstmt->commandType == CMD_SELECT)
394 10664 : return pstmt->planTree->targetlist;
395 129 : if (pstmt->hasReturning)
396 129 : return pstmt->planTree->targetlist;
397 0 : return NIL;
398 : }
399 : }
400 983 : if (IsA(stmt, FetchStmt))
401 : {
402 186 : FetchStmt *fstmt = (FetchStmt *) stmt;
403 : Portal subportal;
404 :
405 186 : Assert(!fstmt->ismove);
406 186 : subportal = GetPortalByName(fstmt->portalname);
407 186 : Assert(PortalIsValid(subportal));
408 186 : return FetchPortalTargetList(subportal);
409 : }
410 797 : if (IsA(stmt, ExecuteStmt))
411 : {
412 118 : ExecuteStmt *estmt = (ExecuteStmt *) stmt;
413 : PreparedStatement *entry;
414 :
415 118 : entry = FetchPreparedStatement(estmt->name, true);
416 118 : return FetchPreparedStatementTargetList(entry);
417 : }
418 679 : return NIL;
419 : }
420 :
421 : /*
422 : * PortalStart
423 : * Prepare a portal for execution.
424 : *
425 : * Caller must already have created the portal, done PortalDefineQuery(),
426 : * and adjusted portal options if needed.
427 : *
428 : * If parameters are needed by the query, they must be passed in "params"
429 : * (caller is responsible for giving them appropriate lifetime).
430 : *
431 : * The caller can also provide an initial set of "eflags" to be passed to
432 : * ExecutorStart (but note these can be modified internally, and they are
433 : * currently only honored for PORTAL_ONE_SELECT portals). Most callers
434 : * should simply pass zero.
435 : *
436 : * The caller can optionally pass a snapshot to be used; pass InvalidSnapshot
437 : * for the normal behavior of setting a new snapshot. This parameter is
438 : * presently ignored for non-PORTAL_ONE_SELECT portals (it's only intended
439 : * to be used for cursors).
440 : *
441 : * On return, portal is ready to accept PortalRun() calls, and the result
442 : * tupdesc (if any) is known.
443 : */
444 : void
445 27301 : PortalStart(Portal portal, ParamListInfo params,
446 : int eflags, Snapshot snapshot)
447 : {
448 : Portal saveActivePortal;
449 : ResourceOwner saveResourceOwner;
450 : MemoryContext savePortalContext;
451 : MemoryContext oldContext;
452 : QueryDesc *queryDesc;
453 : int myeflags;
454 :
455 27301 : AssertArg(PortalIsValid(portal));
456 27301 : AssertState(portal->status == PORTAL_DEFINED);
457 :
458 : /*
459 : * Set up global portal context pointers.
460 : */
461 27301 : saveActivePortal = ActivePortal;
462 27301 : saveResourceOwner = CurrentResourceOwner;
463 27301 : savePortalContext = PortalContext;
464 27301 : PG_TRY();
465 : {
466 27301 : ActivePortal = portal;
467 27301 : if (portal->resowner)
468 27301 : CurrentResourceOwner = portal->resowner;
469 27301 : PortalContext = PortalGetHeapMemory(portal);
470 :
471 27301 : oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
472 :
473 : /* Must remember portal param list, if any */
474 27301 : portal->portalParams = params;
475 :
476 : /*
477 : * Determine the portal execution strategy
478 : */
479 27301 : portal->strategy = ChoosePortalStrategy(portal->stmts);
480 :
481 : /*
482 : * Fire her up according to the strategy
483 : */
484 27301 : switch (portal->strategy)
485 : {
486 : case PORTAL_ONE_SELECT:
487 :
488 : /* Must set snapshot before starting executor. */
489 10968 : if (snapshot)
490 467 : PushActiveSnapshot(snapshot);
491 : else
492 10501 : PushActiveSnapshot(GetTransactionSnapshot());
493 :
494 : /*
495 : * Create QueryDesc in portal's context; for the moment, set
496 : * the destination to DestNone.
497 : */
498 10968 : queryDesc = CreateQueryDesc(linitial_node(PlannedStmt, portal->stmts),
499 : portal->sourceText,
500 : GetActiveSnapshot(),
501 : InvalidSnapshot,
502 : None_Receiver,
503 : params,
504 : portal->queryEnv,
505 : 0);
506 :
507 : /*
508 : * If it's a scrollable cursor, executor needs to support
509 : * REWIND and backwards scan, as well as whatever the caller
510 : * might've asked for.
511 : */
512 10968 : if (portal->cursorOptions & CURSOR_OPT_SCROLL)
513 303 : myeflags = eflags | EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD;
514 : else
515 10665 : myeflags = eflags;
516 :
517 : /*
518 : * Call ExecutorStart to prepare the plan for execution
519 : */
520 10968 : ExecutorStart(queryDesc, myeflags);
521 :
522 : /*
523 : * This tells PortalCleanup to shut down the executor
524 : */
525 10929 : portal->queryDesc = queryDesc;
526 :
527 : /*
528 : * Remember tuple descriptor (computed by ExecutorStart)
529 : */
530 10929 : portal->tupDesc = queryDesc->tupDesc;
531 :
532 : /*
533 : * Reset cursor position data to "start of query"
534 : */
535 10929 : portal->atStart = true;
536 10929 : portal->atEnd = false; /* allow fetches */
537 10929 : portal->portalPos = 0;
538 :
539 10929 : PopActiveSnapshot();
540 10929 : break;
541 :
542 : case PORTAL_ONE_RETURNING:
543 : case PORTAL_ONE_MOD_WITH:
544 :
545 : /*
546 : * We don't start the executor until we are told to run the
547 : * portal. We do need to set up the result tupdesc.
548 : */
549 : {
550 : PlannedStmt *pstmt;
551 :
552 156 : pstmt = PortalGetPrimaryStmt(portal);
553 156 : portal->tupDesc =
554 156 : ExecCleanTypeFromTL(pstmt->planTree->targetlist,
555 : false);
556 : }
557 :
558 : /*
559 : * Reset cursor position data to "start of query"
560 : */
561 156 : portal->atStart = true;
562 156 : portal->atEnd = false; /* allow fetches */
563 156 : portal->portalPos = 0;
564 156 : break;
565 :
566 : case PORTAL_UTIL_SELECT:
567 :
568 : /*
569 : * We don't set snapshot here, because PortalRunUtility will
570 : * take care of it if needed.
571 : */
572 : {
573 1507 : PlannedStmt *pstmt = PortalGetPrimaryStmt(portal);
574 :
575 1507 : Assert(pstmt->commandType == CMD_UTILITY);
576 1507 : portal->tupDesc = UtilityTupleDescriptor(pstmt->utilityStmt);
577 : }
578 :
579 : /*
580 : * Reset cursor position data to "start of query"
581 : */
582 1507 : portal->atStart = true;
583 1507 : portal->atEnd = false; /* allow fetches */
584 1507 : portal->portalPos = 0;
585 1507 : break;
586 :
587 : case PORTAL_MULTI_QUERY:
588 : /* Need do nothing now */
589 14670 : portal->tupDesc = NULL;
590 14670 : break;
591 : }
592 : }
593 39 : PG_CATCH();
594 : {
595 : /* Uncaught error while executing portal: mark it dead */
596 39 : MarkPortalFailed(portal);
597 :
598 : /* Restore global vars and propagate error */
599 39 : ActivePortal = saveActivePortal;
600 39 : CurrentResourceOwner = saveResourceOwner;
601 39 : PortalContext = savePortalContext;
602 :
603 39 : PG_RE_THROW();
604 : }
605 27262 : PG_END_TRY();
606 :
607 27262 : MemoryContextSwitchTo(oldContext);
608 :
609 27262 : ActivePortal = saveActivePortal;
610 27262 : CurrentResourceOwner = saveResourceOwner;
611 27262 : PortalContext = savePortalContext;
612 :
613 27262 : portal->status = PORTAL_READY;
614 27262 : }
615 :
616 : /*
617 : * PortalSetResultFormat
618 : * Select the format codes for a portal's output.
619 : *
620 : * This must be run after PortalStart for a portal that will be read by
621 : * a DestRemote or DestRemoteExecute destination. It is not presently needed
622 : * for other destination types.
623 : *
624 : * formats[] is the client format request, as per Bind message conventions.
625 : */
626 : void
627 26292 : PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
628 : {
629 : int natts;
630 : int i;
631 :
632 : /* Do nothing if portal won't return tuples */
633 26292 : if (portal->tupDesc == NULL)
634 40959 : return;
635 11625 : natts = portal->tupDesc->natts;
636 11625 : portal->formats = (int16 *)
637 11625 : MemoryContextAlloc(PortalGetHeapMemory(portal),
638 : natts * sizeof(int16));
639 11625 : if (nFormats > 1)
640 : {
641 : /* format specified for each column */
642 0 : if (nFormats != natts)
643 0 : ereport(ERROR,
644 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
645 : errmsg("bind message has %d result formats but query has %d columns",
646 : nFormats, natts)));
647 0 : memcpy(portal->formats, formats, natts * sizeof(int16));
648 : }
649 11625 : else if (nFormats > 0)
650 : {
651 : /* single format specified, use for all columns */
652 11625 : int16 format1 = formats[0];
653 :
654 40331 : for (i = 0; i < natts; i++)
655 28706 : portal->formats[i] = format1;
656 : }
657 : else
658 : {
659 : /* use default format for all columns */
660 0 : for (i = 0; i < natts; i++)
661 0 : portal->formats[i] = 0;
662 : }
663 : }
664 :
665 : /*
666 : * PortalRun
667 : * Run a portal's query or queries.
668 : *
669 : * count <= 0 is interpreted as a no-op: the destination gets started up
670 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
671 : * interpreted as "all rows". Note that count is ignored in multi-query
672 : * situations, where we always run the portal to completion.
673 : *
674 : * isTopLevel: true if query is being executed at backend "top level"
675 : * (that is, directly from a client command message)
676 : *
677 : * dest: where to send output of primary (canSetTag) query
678 : *
679 : * altdest: where to send output of non-primary queries
680 : *
681 : * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
682 : * in which to store a command completion status string.
683 : * May be NULL if caller doesn't want a status string.
684 : *
685 : * Returns TRUE if the portal's execution is complete, FALSE if it was
686 : * suspended due to exhaustion of the count parameter.
687 : */
688 : bool
689 26417 : PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
690 : DestReceiver *dest, DestReceiver *altdest,
691 : char *completionTag)
692 : {
693 : bool result;
694 : uint64 nprocessed;
695 : ResourceOwner saveTopTransactionResourceOwner;
696 : MemoryContext saveTopTransactionContext;
697 : Portal saveActivePortal;
698 : ResourceOwner saveResourceOwner;
699 : MemoryContext savePortalContext;
700 : MemoryContext saveMemoryContext;
701 :
702 26417 : AssertArg(PortalIsValid(portal));
703 :
704 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
705 :
706 : /* Initialize completion tag to empty string */
707 26417 : if (completionTag)
708 26417 : completionTag[0] = '\0';
709 :
710 26417 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
711 : {
712 0 : elog(DEBUG3, "PortalRun");
713 : /* PORTAL_MULTI_QUERY logs its own stats per query */
714 0 : ResetUsage();
715 : }
716 :
717 : /*
718 : * Check for improper portal use, and mark portal active.
719 : */
720 26417 : MarkPortalActive(portal);
721 :
722 : /* Set run_once flag. Shouldn't be clear if previously set. */
723 26417 : Assert(!portal->run_once || run_once);
724 26417 : portal->run_once = run_once;
725 :
726 : /*
727 : * Set up global portal context pointers.
728 : *
729 : * We have to play a special game here to support utility commands like
730 : * VACUUM and CLUSTER, which internally start and commit transactions.
731 : * When we are called to execute such a command, CurrentResourceOwner will
732 : * be pointing to the TopTransactionResourceOwner --- which will be
733 : * destroyed and replaced in the course of the internal commit and
734 : * restart. So we need to be prepared to restore it as pointing to the
735 : * exit-time TopTransactionResourceOwner. (Ain't that ugly? This idea of
736 : * internally starting whole new transactions is not good.)
737 : * CurrentMemoryContext has a similar problem, but the other pointers we
738 : * save here will be NULL or pointing to longer-lived objects.
739 : */
740 26417 : saveTopTransactionResourceOwner = TopTransactionResourceOwner;
741 26417 : saveTopTransactionContext = TopTransactionContext;
742 26417 : saveActivePortal = ActivePortal;
743 26417 : saveResourceOwner = CurrentResourceOwner;
744 26417 : savePortalContext = PortalContext;
745 26417 : saveMemoryContext = CurrentMemoryContext;
746 26417 : PG_TRY();
747 : {
748 26417 : ActivePortal = portal;
749 26417 : if (portal->resowner)
750 26417 : CurrentResourceOwner = portal->resowner;
751 26417 : PortalContext = PortalGetHeapMemory(portal);
752 :
753 26417 : MemoryContextSwitchTo(PortalContext);
754 :
755 26417 : switch (portal->strategy)
756 : {
757 : case PORTAL_ONE_SELECT:
758 : case PORTAL_ONE_RETURNING:
759 : case PORTAL_ONE_MOD_WITH:
760 : case PORTAL_UTIL_SELECT:
761 :
762 : /*
763 : * If we have not yet run the command, do so, storing its
764 : * results in the portal's tuplestore. But we don't do that
765 : * for the PORTAL_ONE_SELECT case.
766 : */
767 11747 : if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore)
768 1163 : FillPortalStore(portal, isTopLevel);
769 :
770 : /*
771 : * Now fetch desired portion of results.
772 : */
773 11713 : nprocessed = PortalRunSelect(portal, true, count, dest);
774 :
775 : /*
776 : * If the portal result contains a command tag and the caller
777 : * gave us a pointer to store it, copy it. Patch the "SELECT"
778 : * tag to also provide the rowcount.
779 : */
780 11321 : if (completionTag && portal->commandTag)
781 : {
782 11321 : if (strcmp(portal->commandTag, "SELECT") == 0)
783 10192 : snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
784 : "SELECT " UINT64_FORMAT, nprocessed);
785 : else
786 1129 : strcpy(completionTag, portal->commandTag);
787 : }
788 :
789 : /* Mark portal not active */
790 11321 : portal->status = PORTAL_READY;
791 :
792 : /*
793 : * Since it's a forward fetch, say DONE iff atEnd is now true.
794 : */
795 11321 : result = portal->atEnd;
796 11321 : break;
797 :
798 : case PORTAL_MULTI_QUERY:
799 14670 : PortalRunMulti(portal, isTopLevel, false,
800 : dest, altdest, completionTag);
801 :
802 : /* Prevent portal's commands from being re-executed */
803 13100 : MarkPortalDone(portal);
804 :
805 : /* Always complete at end of RunMulti */
806 13100 : result = true;
807 13100 : break;
808 :
809 : default:
810 0 : elog(ERROR, "unrecognized portal strategy: %d",
811 : (int) portal->strategy);
812 : result = false; /* keep compiler quiet */
813 : break;
814 : }
815 : }
816 1996 : PG_CATCH();
817 : {
818 : /* Uncaught error while executing portal: mark it dead */
819 1996 : MarkPortalFailed(portal);
820 :
821 : /* Restore global vars and propagate error */
822 1996 : if (saveMemoryContext == saveTopTransactionContext)
823 1971 : MemoryContextSwitchTo(TopTransactionContext);
824 : else
825 25 : MemoryContextSwitchTo(saveMemoryContext);
826 1996 : ActivePortal = saveActivePortal;
827 1996 : if (saveResourceOwner == saveTopTransactionResourceOwner)
828 1971 : CurrentResourceOwner = TopTransactionResourceOwner;
829 : else
830 25 : CurrentResourceOwner = saveResourceOwner;
831 1996 : PortalContext = savePortalContext;
832 :
833 1996 : PG_RE_THROW();
834 : }
835 24421 : PG_END_TRY();
836 :
837 24421 : if (saveMemoryContext == saveTopTransactionContext)
838 24030 : MemoryContextSwitchTo(TopTransactionContext);
839 : else
840 391 : MemoryContextSwitchTo(saveMemoryContext);
841 24421 : ActivePortal = saveActivePortal;
842 24421 : if (saveResourceOwner == saveTopTransactionResourceOwner)
843 24030 : CurrentResourceOwner = TopTransactionResourceOwner;
844 : else
845 391 : CurrentResourceOwner = saveResourceOwner;
846 24421 : PortalContext = savePortalContext;
847 :
848 24421 : if (log_executor_stats && portal->strategy != PORTAL_MULTI_QUERY)
849 0 : ShowUsage("EXECUTOR STATISTICS");
850 :
851 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
852 :
853 24421 : return result;
854 : }
855 :
856 : /*
857 : * PortalRunSelect
858 : * Execute a portal's query in PORTAL_ONE_SELECT mode, and also
859 : * when fetching from a completed holdStore in PORTAL_ONE_RETURNING,
860 : * PORTAL_ONE_MOD_WITH, and PORTAL_UTIL_SELECT cases.
861 : *
862 : * This handles simple N-rows-forward-or-backward cases. For more complex
863 : * nonsequential access to a portal, see PortalRunFetch.
864 : *
865 : * count <= 0 is interpreted as a no-op: the destination gets started up
866 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
867 : * interpreted as "all rows". (cf FetchStmt.howMany)
868 : *
869 : * Caller must already have validated the Portal and done appropriate
870 : * setup (cf. PortalRun).
871 : *
872 : * Returns number of rows processed (suitable for use in result tag)
873 : */
874 : static uint64
875 13466 : PortalRunSelect(Portal portal,
876 : bool forward,
877 : long count,
878 : DestReceiver *dest)
879 : {
880 : QueryDesc *queryDesc;
881 : ScanDirection direction;
882 : uint64 nprocessed;
883 :
884 : /*
885 : * NB: queryDesc will be NULL if we are fetching from a held cursor or a
886 : * completed utility query; can't use it in that path.
887 : */
888 13466 : queryDesc = PortalGetQueryDesc(portal);
889 :
890 : /* Caller messed up if we have neither a ready query nor held data. */
891 13466 : Assert(queryDesc || portal->holdStore);
892 :
893 : /*
894 : * Force the queryDesc destination to the right thing. This supports
895 : * MOVE, for example, which will pass in dest = DestNone. This is okay to
896 : * change as long as we do it on every fetch. (The Executor must not
897 : * assume that dest never changes.)
898 : */
899 13466 : if (queryDesc)
900 11330 : queryDesc->dest = dest;
901 :
902 : /*
903 : * Determine which direction to go in, and check to see if we're already
904 : * at the end of the available tuples in that direction. If so, set the
905 : * direction to NoMovement to avoid trying to fetch any tuples. (This
906 : * check exists because not all plan node types are robust about being
907 : * called again if they've already returned NULL once.) Then call the
908 : * executor (we must not skip this, because the destination needs to see a
909 : * setup and shutdown even if no tuples are available). Finally, update
910 : * the portal position state depending on the number of tuples that were
911 : * retrieved.
912 : */
913 13466 : if (forward)
914 : {
915 13390 : if (portal->atEnd || count <= 0)
916 : {
917 720 : direction = NoMovementScanDirection;
918 720 : count = 0; /* don't pass negative count to executor */
919 : }
920 : else
921 12670 : direction = ForwardScanDirection;
922 :
923 : /* In the executor, zero count processes all rows */
924 13390 : if (count == FETCH_ALL)
925 11756 : count = 0;
926 :
927 13390 : if (portal->holdStore)
928 2134 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
929 : else
930 : {
931 11256 : PushActiveSnapshot(queryDesc->snapshot);
932 11256 : ExecutorRun(queryDesc, direction, (uint64) count,
933 11256 : portal->run_once);
934 10862 : nprocessed = queryDesc->estate->es_processed;
935 10862 : PopActiveSnapshot();
936 : }
937 :
938 12996 : if (!ScanDirectionIsNoMovement(direction))
939 : {
940 12276 : if (nprocessed > 0)
941 10531 : portal->atStart = false; /* OK to go backward now */
942 12276 : if (count == 0 || nprocessed < (uint64) count)
943 12112 : portal->atEnd = true; /* we retrieved 'em all */
944 12276 : portal->portalPos += nprocessed;
945 : }
946 : }
947 : else
948 : {
949 76 : if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
950 3 : ereport(ERROR,
951 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
952 : errmsg("cursor can only scan forward"),
953 : errhint("Declare it with SCROLL option to enable backward scan.")));
954 :
955 73 : if (portal->atStart || count <= 0)
956 : {
957 8 : direction = NoMovementScanDirection;
958 8 : count = 0; /* don't pass negative count to executor */
959 : }
960 : else
961 65 : direction = BackwardScanDirection;
962 :
963 : /* In the executor, zero count processes all rows */
964 73 : if (count == FETCH_ALL)
965 6 : count = 0;
966 :
967 73 : if (portal->holdStore)
968 2 : nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
969 : else
970 : {
971 71 : PushActiveSnapshot(queryDesc->snapshot);
972 71 : ExecutorRun(queryDesc, direction, (uint64) count,
973 71 : portal->run_once);
974 71 : nprocessed = queryDesc->estate->es_processed;
975 71 : PopActiveSnapshot();
976 : }
977 :
978 73 : if (!ScanDirectionIsNoMovement(direction))
979 : {
980 65 : if (nprocessed > 0 && portal->atEnd)
981 : {
982 15 : portal->atEnd = false; /* OK to go forward now */
983 15 : portal->portalPos++; /* adjust for endpoint case */
984 : }
985 65 : if (count == 0 || nprocessed < (uint64) count)
986 : {
987 25 : portal->atStart = true; /* we retrieved 'em all */
988 25 : portal->portalPos = 0;
989 : }
990 : else
991 : {
992 40 : portal->portalPos -= nprocessed;
993 : }
994 : }
995 : }
996 :
997 13069 : return nprocessed;
998 : }
999 :
1000 : /*
1001 : * FillPortalStore
1002 : * Run the query and load result tuples into the portal's tuple store.
1003 : *
1004 : * This is used for PORTAL_ONE_RETURNING, PORTAL_ONE_MOD_WITH, and
1005 : * PORTAL_UTIL_SELECT cases only.
1006 : */
1007 : static void
1008 1663 : FillPortalStore(Portal portal, bool isTopLevel)
1009 : {
1010 : DestReceiver *treceiver;
1011 : char completionTag[COMPLETION_TAG_BUFSIZE];
1012 :
1013 1663 : PortalCreateHoldStore(portal);
1014 1663 : treceiver = CreateDestReceiver(DestTuplestore);
1015 1663 : SetTuplestoreDestReceiverParams(treceiver,
1016 : portal->holdStore,
1017 : portal->holdContext,
1018 : false);
1019 :
1020 1663 : completionTag[0] = '\0';
1021 :
1022 1663 : switch (portal->strategy)
1023 : {
1024 : case PORTAL_ONE_RETURNING:
1025 : case PORTAL_ONE_MOD_WITH:
1026 :
1027 : /*
1028 : * Run the portal to completion just as for the default
1029 : * MULTI_QUERY case, but send the primary query's output to the
1030 : * tuplestore. Auxiliary query outputs are discarded. Set the
1031 : * portal's holdSnapshot to the snapshot used (or a copy of it).
1032 : */
1033 156 : PortalRunMulti(portal, isTopLevel, true,
1034 : treceiver, None_Receiver, completionTag);
1035 146 : break;
1036 :
1037 : case PORTAL_UTIL_SELECT:
1038 1507 : PortalRunUtility(portal, linitial_node(PlannedStmt, portal->stmts),
1039 : isTopLevel, true, treceiver, completionTag);
1040 1483 : break;
1041 :
1042 : default:
1043 0 : elog(ERROR, "unsupported portal strategy: %d",
1044 : (int) portal->strategy);
1045 : break;
1046 : }
1047 :
1048 : /* Override default completion tag with actual command result */
1049 1629 : if (completionTag[0] != '\0')
1050 450 : portal->commandTag = pstrdup(completionTag);
1051 :
1052 1629 : (*treceiver->rDestroy) (treceiver);
1053 1629 : }
1054 :
1055 : /*
1056 : * RunFromStore
1057 : * Fetch tuples from the portal's tuple store.
1058 : *
1059 : * Calling conventions are similar to ExecutorRun, except that we
1060 : * do not depend on having a queryDesc or estate. Therefore we return the
1061 : * number of tuples processed as the result, not in estate->es_processed.
1062 : *
1063 : * One difference from ExecutorRun is that the destination receiver functions
1064 : * are run in the caller's memory context (since we have no estate). Watch
1065 : * out for memory leaks.
1066 : */
1067 : static uint64
1068 2136 : RunFromStore(Portal portal, ScanDirection direction, uint64 count,
1069 : DestReceiver *dest)
1070 : {
1071 2136 : uint64 current_tuple_count = 0;
1072 : TupleTableSlot *slot;
1073 :
1074 2136 : slot = MakeSingleTupleTableSlot(portal->tupDesc);
1075 :
1076 2136 : (*dest->rStartup) (dest, CMD_SELECT, portal->tupDesc);
1077 :
1078 2136 : if (ScanDirectionIsNoMovement(direction))
1079 : {
1080 : /* do nothing except start/stop the destination */
1081 : }
1082 : else
1083 : {
1084 1636 : bool forward = ScanDirectionIsForward(direction);
1085 :
1086 : for (;;)
1087 : {
1088 : MemoryContext oldcontext;
1089 : bool ok;
1090 :
1091 19196 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1092 :
1093 19196 : ok = tuplestore_gettupleslot(portal->holdStore, forward, false,
1094 : slot);
1095 :
1096 19196 : MemoryContextSwitchTo(oldcontext);
1097 :
1098 19196 : if (!ok)
1099 1633 : break;
1100 :
1101 : /*
1102 : * If we are not able to send the tuple, we assume the destination
1103 : * has closed and no more tuples can be sent. If that's the case,
1104 : * end the loop.
1105 : */
1106 17563 : if (!((*dest->receiveSlot) (slot, dest)))
1107 0 : break;
1108 :
1109 17563 : ExecClearTuple(slot);
1110 :
1111 : /*
1112 : * check our tuple count.. if we've processed the proper number
1113 : * then quit, else loop again and process more tuples. Zero count
1114 : * means no limit.
1115 : */
1116 17563 : current_tuple_count++;
1117 17563 : if (count && count == current_tuple_count)
1118 3 : break;
1119 17560 : }
1120 : }
1121 :
1122 2136 : (*dest->rShutdown) (dest);
1123 :
1124 2136 : ExecDropSingleTupleTableSlot(slot);
1125 :
1126 2136 : return current_tuple_count;
1127 : }
1128 :
1129 : /*
1130 : * PortalRunUtility
1131 : * Execute a utility statement inside a portal.
1132 : */
1133 : static void
1134 12428 : PortalRunUtility(Portal portal, PlannedStmt *pstmt,
1135 : bool isTopLevel, bool setHoldSnapshot,
1136 : DestReceiver *dest, char *completionTag)
1137 : {
1138 12428 : Node *utilityStmt = pstmt->utilityStmt;
1139 : Snapshot snapshot;
1140 :
1141 : /*
1142 : * Set snapshot if utility stmt needs one. Most reliable way to do this
1143 : * seems to be to enumerate those that do not need one; this is a short
1144 : * list. Transaction control, LOCK, and SET must *not* set a snapshot
1145 : * since they need to be executable at the start of a transaction-snapshot
1146 : * mode transaction without freezing a snapshot. By extension we allow
1147 : * SHOW not to set a snapshot. The other stmts listed are just efficiency
1148 : * hacks. Beware of listing anything that can modify the database --- if,
1149 : * say, it has to update an index with expressions that invoke
1150 : * user-defined functions, then it had better have a snapshot.
1151 : */
1152 34468 : if (!(IsA(utilityStmt, TransactionStmt) ||
1153 23288 : IsA(utilityStmt, LockStmt) ||
1154 22282 : IsA(utilityStmt, VariableSetStmt) ||
1155 21252 : IsA(utilityStmt, VariableShowStmt) ||
1156 21178 : IsA(utilityStmt, ConstraintsSetStmt) ||
1157 : /* efficiency hacks from here down */
1158 20965 : IsA(utilityStmt, FetchStmt) ||
1159 20760 : IsA(utilityStmt, ListenStmt) ||
1160 20757 : IsA(utilityStmt, NotifyStmt) ||
1161 10378 : IsA(utilityStmt, UnlistenStmt) ||
1162 10376 : IsA(utilityStmt, CheckPointStmt)))
1163 : {
1164 10376 : snapshot = GetTransactionSnapshot();
1165 : /* If told to, register the snapshot we're using and save in portal */
1166 10376 : if (setHoldSnapshot)
1167 : {
1168 1250 : snapshot = RegisterSnapshot(snapshot);
1169 1250 : portal->holdSnapshot = snapshot;
1170 : }
1171 10376 : PushActiveSnapshot(snapshot);
1172 : /* PushActiveSnapshot might have copied the snapshot */
1173 10376 : snapshot = GetActiveSnapshot();
1174 : }
1175 : else
1176 2052 : snapshot = NULL;
1177 :
1178 12428 : ProcessUtility(pstmt,
1179 : portal->sourceText,
1180 : isTopLevel ? PROCESS_UTILITY_TOPLEVEL : PROCESS_UTILITY_QUERY,
1181 : portal->portalParams,
1182 : portal->queryEnv,
1183 : dest,
1184 : completionTag);
1185 :
1186 : /* Some utility statements may change context on us */
1187 11175 : MemoryContextSwitchTo(PortalGetHeapMemory(portal));
1188 :
1189 : /*
1190 : * Some utility commands may pop the ActiveSnapshot stack from under us,
1191 : * so be careful to only pop the stack if our snapshot is still at the
1192 : * top.
1193 : */
1194 20272 : if (snapshot != NULL && ActiveSnapshotSet() &&
1195 9097 : snapshot == GetActiveSnapshot())
1196 9097 : PopActiveSnapshot();
1197 11175 : }
1198 :
1199 : /*
1200 : * PortalRunMulti
1201 : * Execute a portal's queries in the general case (multi queries
1202 : * or non-SELECT-like queries)
1203 : */
1204 : static void
1205 14826 : PortalRunMulti(Portal portal,
1206 : bool isTopLevel, bool setHoldSnapshot,
1207 : DestReceiver *dest, DestReceiver *altdest,
1208 : char *completionTag)
1209 : {
1210 14826 : bool active_snapshot_set = false;
1211 : ListCell *stmtlist_item;
1212 :
1213 : /*
1214 : * If the destination is DestRemoteExecute, change to DestNone. The
1215 : * reason is that the client won't be expecting any tuples, and indeed has
1216 : * no way to know what they are, since there is no provision for Describe
1217 : * to send a RowDescription message when this portal execution strategy is
1218 : * in effect. This presently will only affect SELECT commands added to
1219 : * non-SELECT queries by rewrite rules: such commands will be executed,
1220 : * but the results will be discarded unless you use "simple Query"
1221 : * protocol.
1222 : */
1223 14826 : if (dest->mydest == DestRemoteExecute)
1224 0 : dest = None_Receiver;
1225 14826 : if (altdest->mydest == DestRemoteExecute)
1226 0 : altdest = None_Receiver;
1227 :
1228 : /*
1229 : * Loop to handle the individual queries generated from a single parsetree
1230 : * by analysis and rewrite.
1231 : */
1232 28158 : foreach(stmtlist_item, portal->stmts)
1233 : {
1234 14912 : PlannedStmt *pstmt = lfirst_node(PlannedStmt, stmtlist_item);
1235 :
1236 : /*
1237 : * If we got a cancel signal in prior command, quit
1238 : */
1239 14912 : CHECK_FOR_INTERRUPTS();
1240 :
1241 14912 : if (pstmt->utilityStmt == NULL)
1242 : {
1243 : /*
1244 : * process a plannable query.
1245 : */
1246 : TRACE_POSTGRESQL_QUERY_EXECUTE_START();
1247 :
1248 3991 : if (log_executor_stats)
1249 0 : ResetUsage();
1250 :
1251 : /*
1252 : * Must always have a snapshot for plannable queries. First time
1253 : * through, take a new snapshot; for subsequent queries in the
1254 : * same portal, just update the snapshot's copy of the command
1255 : * counter.
1256 : */
1257 3991 : if (!active_snapshot_set)
1258 : {
1259 3905 : Snapshot snapshot = GetTransactionSnapshot();
1260 :
1261 : /* If told to, register the snapshot and save in portal */
1262 3905 : if (setHoldSnapshot)
1263 : {
1264 156 : snapshot = RegisterSnapshot(snapshot);
1265 156 : portal->holdSnapshot = snapshot;
1266 : }
1267 :
1268 : /*
1269 : * We can't have the holdSnapshot also be the active one,
1270 : * because UpdateActiveSnapshotCommandId would complain. So
1271 : * force an extra snapshot copy. Plain PushActiveSnapshot
1272 : * would have copied the transaction snapshot anyway, so this
1273 : * only adds a copy step when setHoldSnapshot is true. (It's
1274 : * okay for the command ID of the active snapshot to diverge
1275 : * from what holdSnapshot has.)
1276 : */
1277 3905 : PushCopiedSnapshot(snapshot);
1278 3905 : active_snapshot_set = true;
1279 : }
1280 : else
1281 86 : UpdateActiveSnapshotCommandId();
1282 :
1283 3991 : if (pstmt->canSetTag)
1284 : {
1285 : /* statement can set tag string */
1286 3903 : ProcessQuery(pstmt,
1287 : portal->sourceText,
1288 : portal->portalParams,
1289 : portal->queryEnv,
1290 : dest, completionTag);
1291 : }
1292 : else
1293 : {
1294 : /* stmt added by rewrite cannot set tag */
1295 88 : ProcessQuery(pstmt,
1296 : portal->sourceText,
1297 : portal->portalParams,
1298 : portal->queryEnv,
1299 : altdest, NULL);
1300 : }
1301 :
1302 3640 : if (log_executor_stats)
1303 0 : ShowUsage("EXECUTOR STATISTICS");
1304 :
1305 : TRACE_POSTGRESQL_QUERY_EXECUTE_DONE();
1306 : }
1307 : else
1308 : {
1309 : /*
1310 : * process utility functions (create, destroy, etc..)
1311 : *
1312 : * We must not set a snapshot here for utility commands (if one is
1313 : * needed, PortalRunUtility will do it). If a utility command is
1314 : * alone in a portal then everything's fine. The only case where
1315 : * a utility command can be part of a longer list is that rules
1316 : * are allowed to include NotifyStmt. NotifyStmt doesn't care
1317 : * whether it has a snapshot or not, so we just leave the current
1318 : * snapshot alone if we have one.
1319 : */
1320 10921 : if (pstmt->canSetTag)
1321 : {
1322 10921 : Assert(!active_snapshot_set);
1323 : /* statement can set tag string */
1324 10921 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1325 : dest, completionTag);
1326 : }
1327 : else
1328 : {
1329 0 : Assert(IsA(pstmt->utilityStmt, NotifyStmt));
1330 : /* stmt added by rewrite cannot set tag */
1331 0 : PortalRunUtility(portal, pstmt, isTopLevel, false,
1332 : altdest, NULL);
1333 : }
1334 : }
1335 :
1336 : /*
1337 : * Increment command counter between queries, but not after the last
1338 : * one.
1339 : */
1340 13332 : if (lnext(stmtlist_item) != NULL)
1341 86 : CommandCounterIncrement();
1342 :
1343 : /*
1344 : * Clear subsidiary contexts to recover temporary memory.
1345 : */
1346 13332 : Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext);
1347 :
1348 13332 : MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
1349 : }
1350 :
1351 : /* Pop the snapshot if we pushed one. */
1352 13246 : if (active_snapshot_set)
1353 3554 : PopActiveSnapshot();
1354 :
1355 : /*
1356 : * If a command completion tag was supplied, use it. Otherwise use the
1357 : * portal's commandTag as the default completion tag.
1358 : *
1359 : * Exception: Clients expect INSERT/UPDATE/DELETE tags to have counts, so
1360 : * fake them with zeros. This can happen with DO INSTEAD rules if there
1361 : * is no replacement query of the same type as the original. We print "0
1362 : * 0" here because technically there is no query of the matching tag type,
1363 : * and printing a non-zero count for a different query type seems wrong,
1364 : * e.g. an INSERT that does an UPDATE instead should not print "0 1" if
1365 : * one row was updated. See QueryRewrite(), step 3, for details.
1366 : */
1367 13246 : if (completionTag && completionTag[0] == '\0')
1368 : {
1369 9413 : if (portal->commandTag)
1370 9413 : strcpy(completionTag, portal->commandTag);
1371 9413 : if (strcmp(completionTag, "SELECT") == 0)
1372 0 : sprintf(completionTag, "SELECT 0 0");
1373 9413 : else if (strcmp(completionTag, "INSERT") == 0)
1374 1 : strcpy(completionTag, "INSERT 0 0");
1375 9412 : else if (strcmp(completionTag, "UPDATE") == 0)
1376 0 : strcpy(completionTag, "UPDATE 0");
1377 9412 : else if (strcmp(completionTag, "DELETE") == 0)
1378 1 : strcpy(completionTag, "DELETE 0");
1379 : }
1380 13246 : }
1381 :
1382 : /*
1383 : * PortalRunFetch
1384 : * Variant form of PortalRun that supports SQL FETCH directions.
1385 : *
1386 : * Note: we presently assume that no callers of this want isTopLevel = true.
1387 : *
1388 : * count <= 0 is interpreted as a no-op: the destination gets started up
1389 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
1390 : * interpreted as "all rows". (cf FetchStmt.howMany)
1391 : *
1392 : * Returns number of rows processed (suitable for use in result tag)
1393 : */
1394 : uint64
1395 1745 : PortalRunFetch(Portal portal,
1396 : FetchDirection fdirection,
1397 : long count,
1398 : DestReceiver *dest)
1399 : {
1400 : uint64 result;
1401 : Portal saveActivePortal;
1402 : ResourceOwner saveResourceOwner;
1403 : MemoryContext savePortalContext;
1404 : MemoryContext oldContext;
1405 :
1406 1745 : AssertArg(PortalIsValid(portal));
1407 :
1408 : /*
1409 : * Check for improper portal use, and mark portal active.
1410 : */
1411 1745 : MarkPortalActive(portal);
1412 :
1413 : /* If supporting FETCH, portal can't be run-once. */
1414 1742 : Assert(!portal->run_once);
1415 :
1416 : /*
1417 : * Set up global portal context pointers.
1418 : */
1419 1742 : saveActivePortal = ActivePortal;
1420 1742 : saveResourceOwner = CurrentResourceOwner;
1421 1742 : savePortalContext = PortalContext;
1422 1742 : PG_TRY();
1423 : {
1424 1742 : ActivePortal = portal;
1425 1742 : if (portal->resowner)
1426 1735 : CurrentResourceOwner = portal->resowner;
1427 1742 : PortalContext = PortalGetHeapMemory(portal);
1428 :
1429 1742 : oldContext = MemoryContextSwitchTo(PortalContext);
1430 :
1431 1742 : switch (portal->strategy)
1432 : {
1433 : case PORTAL_ONE_SELECT:
1434 742 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1435 737 : break;
1436 :
1437 : case PORTAL_ONE_RETURNING:
1438 : case PORTAL_ONE_MOD_WITH:
1439 : case PORTAL_UTIL_SELECT:
1440 :
1441 : /*
1442 : * If we have not yet run the command, do so, storing its
1443 : * results in the portal's tuplestore.
1444 : */
1445 1000 : if (!portal->holdStore)
1446 500 : FillPortalStore(portal, false /* isTopLevel */ );
1447 :
1448 : /*
1449 : * Now fetch desired portion of results.
1450 : */
1451 1000 : result = DoPortalRunFetch(portal, fdirection, count, dest);
1452 1000 : break;
1453 :
1454 : default:
1455 0 : elog(ERROR, "unsupported portal strategy");
1456 : result = 0; /* keep compiler quiet */
1457 : break;
1458 : }
1459 : }
1460 5 : PG_CATCH();
1461 : {
1462 : /* Uncaught error while executing portal: mark it dead */
1463 5 : MarkPortalFailed(portal);
1464 :
1465 : /* Restore global vars and propagate error */
1466 5 : ActivePortal = saveActivePortal;
1467 5 : CurrentResourceOwner = saveResourceOwner;
1468 5 : PortalContext = savePortalContext;
1469 :
1470 5 : PG_RE_THROW();
1471 : }
1472 1737 : PG_END_TRY();
1473 :
1474 1737 : MemoryContextSwitchTo(oldContext);
1475 :
1476 : /* Mark portal not active */
1477 1737 : portal->status = PORTAL_READY;
1478 :
1479 1737 : ActivePortal = saveActivePortal;
1480 1737 : CurrentResourceOwner = saveResourceOwner;
1481 1737 : PortalContext = savePortalContext;
1482 :
1483 1737 : return result;
1484 : }
1485 :
1486 : /*
1487 : * DoPortalRunFetch
1488 : * Guts of PortalRunFetch --- the portal context is already set up
1489 : *
1490 : * count <= 0 is interpreted as a no-op: the destination gets started up
1491 : * and shut down, but nothing else happens. Also, count == FETCH_ALL is
1492 : * interpreted as "all rows". (cf FetchStmt.howMany)
1493 : *
1494 : * Returns number of rows processed (suitable for use in result tag)
1495 : */
1496 : static uint64
1497 1742 : DoPortalRunFetch(Portal portal,
1498 : FetchDirection fdirection,
1499 : long count,
1500 : DestReceiver *dest)
1501 : {
1502 : bool forward;
1503 :
1504 1742 : Assert(portal->strategy == PORTAL_ONE_SELECT ||
1505 : portal->strategy == PORTAL_ONE_RETURNING ||
1506 : portal->strategy == PORTAL_ONE_MOD_WITH ||
1507 : portal->strategy == PORTAL_UTIL_SELECT);
1508 :
1509 1742 : switch (fdirection)
1510 : {
1511 : case FETCH_FORWARD:
1512 1653 : if (count < 0)
1513 : {
1514 0 : fdirection = FETCH_BACKWARD;
1515 0 : count = -count;
1516 : }
1517 : /* fall out of switch to share code with FETCH_BACKWARD */
1518 1653 : break;
1519 : case FETCH_BACKWARD:
1520 63 : if (count < 0)
1521 : {
1522 0 : fdirection = FETCH_FORWARD;
1523 0 : count = -count;
1524 : }
1525 : /* fall out of switch to share code with FETCH_FORWARD */
1526 63 : break;
1527 : case FETCH_ABSOLUTE:
1528 12 : if (count > 0)
1529 : {
1530 : /*
1531 : * Definition: Rewind to start, advance count-1 rows, return
1532 : * next row (if any).
1533 : *
1534 : * In practice, if the goal is less than halfway back to the
1535 : * start, it's better to scan from where we are.
1536 : *
1537 : * Also, if current portalPos is outside the range of "long",
1538 : * do it the hard way to avoid possible overflow of the count
1539 : * argument to PortalRunSelect. We must exclude exactly
1540 : * LONG_MAX, as well, lest the count look like FETCH_ALL.
1541 : *
1542 : * In any case, we arrange to fetch the target row going
1543 : * forwards.
1544 : */
1545 6 : if ((uint64) (count - 1) <= portal->portalPos / 2 ||
1546 0 : portal->portalPos >= (uint64) LONG_MAX)
1547 : {
1548 6 : DoPortalRewind(portal);
1549 12 : if (count > 1)
1550 0 : PortalRunSelect(portal, true, count - 1,
1551 : None_Receiver);
1552 : }
1553 : else
1554 : {
1555 0 : long pos = (long) portal->portalPos;
1556 :
1557 0 : if (portal->atEnd)
1558 0 : pos++; /* need one extra fetch if off end */
1559 0 : if (count <= pos)
1560 0 : PortalRunSelect(portal, false, pos - count + 1,
1561 : None_Receiver);
1562 0 : else if (count > pos + 1)
1563 0 : PortalRunSelect(portal, true, count - pos - 1,
1564 : None_Receiver);
1565 : }
1566 6 : return PortalRunSelect(portal, true, 1L, dest);
1567 : }
1568 6 : else if (count < 0)
1569 : {
1570 : /*
1571 : * Definition: Advance to end, back up abs(count)-1 rows,
1572 : * return prior row (if any). We could optimize this if we
1573 : * knew in advance where the end was, but typically we won't.
1574 : * (Is it worth considering case where count > half of size of
1575 : * query? We could rewind once we know the size ...)
1576 : */
1577 6 : PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1578 6 : if (count < -1)
1579 0 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1580 6 : return PortalRunSelect(portal, false, 1L, dest);
1581 : }
1582 : else
1583 : {
1584 : /* count == 0 */
1585 : /* Rewind to start, return zero rows */
1586 0 : DoPortalRewind(portal);
1587 0 : return PortalRunSelect(portal, true, 0L, dest);
1588 : }
1589 : break;
1590 : case FETCH_RELATIVE:
1591 14 : if (count > 0)
1592 : {
1593 : /*
1594 : * Definition: advance count-1 rows, return next row (if any).
1595 : */
1596 6 : if (count > 1)
1597 4 : PortalRunSelect(portal, true, count - 1, None_Receiver);
1598 6 : return PortalRunSelect(portal, true, 1L, dest);
1599 : }
1600 8 : else if (count < 0)
1601 : {
1602 : /*
1603 : * Definition: back up abs(count)-1 rows, return prior row (if
1604 : * any).
1605 : */
1606 5 : if (count < -1)
1607 3 : PortalRunSelect(portal, false, -count - 1, None_Receiver);
1608 5 : return PortalRunSelect(portal, false, 1L, dest);
1609 : }
1610 : else
1611 : {
1612 : /* count == 0 */
1613 : /* Same as FETCH FORWARD 0, so fall out of switch */
1614 3 : fdirection = FETCH_FORWARD;
1615 : }
1616 3 : break;
1617 : default:
1618 0 : elog(ERROR, "bogus direction");
1619 : break;
1620 : }
1621 :
1622 : /*
1623 : * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and count
1624 : * >= 0.
1625 : */
1626 1719 : forward = (fdirection == FETCH_FORWARD);
1627 :
1628 : /*
1629 : * Zero count means to re-fetch the current row, if any (per SQL)
1630 : */
1631 1719 : if (count == 0)
1632 : {
1633 : bool on_row;
1634 :
1635 : /* Are we sitting on a row? */
1636 3 : on_row = (!portal->atStart && !portal->atEnd);
1637 :
1638 3 : if (dest->mydest == DestNone)
1639 : {
1640 : /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1641 0 : return on_row ? 1 : 0;
1642 : }
1643 : else
1644 : {
1645 : /*
1646 : * If we are sitting on a row, back up one so we can re-fetch it.
1647 : * If we are not sitting on a row, we still have to start up and
1648 : * shut down the executor so that the destination is initialized
1649 : * and shut down correctly; so keep going. To PortalRunSelect,
1650 : * count == 0 means we will retrieve no row.
1651 : */
1652 3 : if (on_row)
1653 : {
1654 3 : PortalRunSelect(portal, false, 1L, None_Receiver);
1655 : /* Set up to fetch one row forward */
1656 2 : count = 1;
1657 2 : forward = true;
1658 : }
1659 : }
1660 : }
1661 :
1662 : /*
1663 : * Optimize MOVE BACKWARD ALL into a Rewind.
1664 : */
1665 1718 : if (!forward && count == FETCH_ALL && dest->mydest == DestNone)
1666 : {
1667 4 : uint64 result = portal->portalPos;
1668 :
1669 4 : if (result > 0 && !portal->atEnd)
1670 0 : result--;
1671 4 : DoPortalRewind(portal);
1672 4 : return result;
1673 : }
1674 :
1675 1714 : return PortalRunSelect(portal, forward, count, dest);
1676 : }
1677 :
1678 : /*
1679 : * DoPortalRewind - rewind a Portal to starting point
1680 : */
1681 : static void
1682 10 : DoPortalRewind(Portal portal)
1683 : {
1684 : QueryDesc *queryDesc;
1685 :
1686 : /* Rewind holdStore, if we have one */
1687 10 : if (portal->holdStore)
1688 : {
1689 : MemoryContext oldcontext;
1690 :
1691 1 : oldcontext = MemoryContextSwitchTo(portal->holdContext);
1692 1 : tuplestore_rescan(portal->holdStore);
1693 1 : MemoryContextSwitchTo(oldcontext);
1694 : }
1695 :
1696 : /* Rewind executor, if active */
1697 10 : queryDesc = PortalGetQueryDesc(portal);
1698 10 : if (queryDesc)
1699 : {
1700 9 : PushActiveSnapshot(queryDesc->snapshot);
1701 9 : ExecutorRewind(queryDesc);
1702 9 : PopActiveSnapshot();
1703 : }
1704 :
1705 10 : portal->atStart = true;
1706 10 : portal->atEnd = false;
1707 10 : portal->portalPos = 0;
1708 10 : }
|