OSDN Git Service

0626ac5e8f4facbe51c7443c31e0f00ce81a91e1
[pg-rex/syncrep.git] / src / backend / tcop / pquery.c
1 /*-------------------------------------------------------------------------
2  *
3  * pquery.c
4  *        POSTGRES process query command code
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/tcop/pquery.c,v 1.69 2003/08/04 00:43:25 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "executor/executor.h"
18 #include "miscadmin.h"
19 #include "tcop/tcopprot.h"
20 #include "tcop/pquery.h"
21 #include "tcop/utility.h"
22 #include "utils/guc.h"
23 #include "utils/memutils.h"
24
25
26 static uint32 RunFromStore(Portal portal, ScanDirection direction, long count,
27                          DestReceiver *dest);
28 static long PortalRunSelect(Portal portal, bool forward, long count,
29                                 DestReceiver *dest);
30 static void PortalRunUtility(Portal portal, Query *query,
31                                  DestReceiver *dest, char *completionTag);
32 static void PortalRunMulti(Portal portal,
33                            DestReceiver *dest, DestReceiver *altdest,
34                            char *completionTag);
35 static long DoPortalRunFetch(Portal portal,
36                                  FetchDirection fdirection,
37                                  long count,
38                                  DestReceiver *dest);
39 static void DoPortalRewind(Portal portal);
40
41
42 /*
43  * CreateQueryDesc
44  */
45 QueryDesc *
46 CreateQueryDesc(Query *parsetree,
47                                 Plan *plantree,
48                                 DestReceiver *dest,
49                                 ParamListInfo params,
50                                 bool doInstrument)
51 {
52         QueryDesc  *qd = (QueryDesc *) palloc(sizeof(QueryDesc));
53
54         qd->operation = parsetree->commandType;         /* operation */
55         qd->parsetree = parsetree;      /* parse tree */
56         qd->plantree = plantree;        /* plan */
57         qd->dest = dest;                        /* output dest */
58         qd->params = params;            /* parameter values passed into query */
59         qd->doInstrument = doInstrument;        /* instrumentation wanted? */
60
61         /* null these fields until set by ExecutorStart */
62         qd->tupDesc = NULL;
63         qd->estate = NULL;
64         qd->planstate = NULL;
65
66         return qd;
67 }
68
69 /*
70  * FreeQueryDesc
71  */
72 void
73 FreeQueryDesc(QueryDesc *qdesc)
74 {
75         /* Can't be a live query */
76         Assert(qdesc->estate == NULL);
77         /* Only the QueryDesc itself need be freed */
78         pfree(qdesc);
79 }
80
81
82 /*
83  * ProcessQuery
84  *              Execute a single query
85  *
86  *      parsetree: the query tree
87  *      plan: the plan tree for the query
88  *      params: any parameters needed
89  *      dest: where to send results
90  *      completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
91  *              in which to store a command completion status string.
92  *
93  * completionTag may be NULL if caller doesn't want a status string.
94  *
95  * Must be called in a memory context that will be reset or deleted on
96  * error; otherwise the executor's memory usage will be leaked.
97  */
98 void
99 ProcessQuery(Query *parsetree,
100                          Plan *plan,
101                          ParamListInfo params,
102                          DestReceiver *dest,
103                          char *completionTag)
104 {
105         int                     operation = parsetree->commandType;
106         QueryDesc  *queryDesc;
107
108         /*
109          * Check for special-case destinations
110          */
111         if (operation == CMD_SELECT)
112         {
113                 if (parsetree->into != NULL)
114                 {
115                         /*
116                          * SELECT INTO table (a/k/a CREATE AS ... SELECT).
117                          *
118                          * Override the normal communication destination; execMain.c
119                          * special-cases this case.  (Perhaps would be cleaner to have
120                          * an additional destination type?)
121                          */
122                         dest = None_Receiver;
123                 }
124         }
125
126         /*
127          * Create the QueryDesc object
128          */
129         queryDesc = CreateQueryDesc(parsetree, plan, dest, params, false);
130
131         /*
132          * Call ExecStart to prepare the plan for execution
133          */
134         ExecutorStart(queryDesc, false);
135
136         /*
137          * Run the plan to completion.
138          */
139         ExecutorRun(queryDesc, ForwardScanDirection, 0L);
140
141         /*
142          * Build command completion status string, if caller wants one.
143          */
144         if (completionTag)
145         {
146                 Oid                     lastOid;
147
148                 switch (operation)
149                 {
150                         case CMD_SELECT:
151                                 strcpy(completionTag, "SELECT");
152                                 break;
153                         case CMD_INSERT:
154                                 if (queryDesc->estate->es_processed == 1)
155                                         lastOid = queryDesc->estate->es_lastoid;
156                                 else
157                                         lastOid = InvalidOid;
158                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
159                                 "INSERT %u %u", lastOid, queryDesc->estate->es_processed);
160                                 break;
161                         case CMD_UPDATE:
162                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
163                                                  "UPDATE %u", queryDesc->estate->es_processed);
164                                 break;
165                         case CMD_DELETE:
166                                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
167                                                  "DELETE %u", queryDesc->estate->es_processed);
168                                 break;
169                         default:
170                                 strcpy(completionTag, "???");
171                                 break;
172                 }
173         }
174
175         /*
176          * Now, we close down all the scans and free allocated resources.
177          */
178         ExecutorEnd(queryDesc);
179
180         FreeQueryDesc(queryDesc);
181 }
182
183 /*
184  * ChoosePortalStrategy
185  *              Select portal execution strategy given the intended query list.
186  *
187  * See the comments in portal.h.
188  */
189 PortalStrategy
190 ChoosePortalStrategy(List *parseTrees)
191 {
192         PortalStrategy strategy;
193
194         strategy = PORTAL_MULTI_QUERY;          /* default assumption */
195
196         if (length(parseTrees) == 1)
197         {
198                 Query      *query = (Query *) lfirst(parseTrees);
199
200                 if (query->commandType == CMD_SELECT &&
201                         query->canSetTag &&
202                         query->into == NULL)
203                         strategy = PORTAL_ONE_SELECT;
204                 else if (query->commandType == CMD_UTILITY &&
205                                  query->canSetTag &&
206                                  query->utilityStmt != NULL)
207                 {
208                         if (UtilityReturnsTuples(query->utilityStmt))
209                                 strategy = PORTAL_UTIL_SELECT;
210                 }
211         }
212         return strategy;
213 }
214
215 /*
216  * PortalStart
217  *              Prepare a portal for execution.
218  *
219  * Caller must already have created the portal, done PortalDefineQuery(),
220  * and adjusted portal options if needed.  If parameters are needed by
221  * the query, they must be passed in here (caller is responsible for
222  * giving them appropriate lifetime).
223  *
224  * On return, portal is ready to accept PortalRun() calls, and the result
225  * tupdesc (if any) is known.
226  */
227 void
228 PortalStart(Portal portal, ParamListInfo params)
229 {
230         MemoryContext oldContext;
231         QueryDesc  *queryDesc;
232
233         AssertArg(PortalIsValid(portal));
234         AssertState(portal->queryContext != NULL);      /* query defined? */
235         AssertState(!portal->portalReady);      /* else extra PortalStart */
236
237         oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
238
239         /* Must remember portal param list, if any */
240         portal->portalParams = params;
241
242         /*
243          * Determine the portal execution strategy
244          */
245         portal->strategy = ChoosePortalStrategy(portal->parseTrees);
246
247         /*
248          * Fire her up according to the strategy
249          */
250         switch (portal->strategy)
251         {
252                 case PORTAL_ONE_SELECT:
253
254                         /*
255                          * Must set query snapshot before starting executor.
256                          */
257                         SetQuerySnapshot();
258
259                         /*
260                          * Create QueryDesc in portal's context; for the moment, set
261                          * the destination to None.
262                          */
263                         queryDesc = CreateQueryDesc((Query *) lfirst(portal->parseTrees),
264                                                                           (Plan *) lfirst(portal->planTrees),
265                                                                                 None_Receiver,
266                                                                                 params,
267                                                                                 false);
268
269                         /*
270                          * Call ExecStart to prepare the plan for execution
271                          */
272                         ExecutorStart(queryDesc, false);
273
274                         /*
275                          * This tells PortalCleanup to shut down the executor
276                          */
277                         portal->queryDesc = queryDesc;
278
279                         /*
280                          * Remember tuple descriptor (computed by ExecutorStart)
281                          */
282                         portal->tupDesc = queryDesc->tupDesc;
283
284                         /*
285                          * Reset cursor position data to "start of query"
286                          */
287                         portal->atStart = true;
288                         portal->atEnd = false;          /* allow fetches */
289                         portal->portalPos = 0;
290                         portal->posOverflow = false;
291                         break;
292
293                 case PORTAL_UTIL_SELECT:
294
295                         /*
296                          * We don't set query snapshot here, because PortalRunUtility
297                          * will take care of it.
298                          */
299                         portal->tupDesc =
300                                 UtilityTupleDescriptor(((Query *) lfirst(portal->parseTrees))->utilityStmt);
301
302                         /*
303                          * Reset cursor position data to "start of query"
304                          */
305                         portal->atStart = true;
306                         portal->atEnd = false;          /* allow fetches */
307                         portal->portalPos = 0;
308                         portal->posOverflow = false;
309                         break;
310
311                 case PORTAL_MULTI_QUERY:
312                         /* Need do nothing now */
313                         portal->tupDesc = NULL;
314                         break;
315         }
316
317         MemoryContextSwitchTo(oldContext);
318
319         portal->portalReady = true;
320 }
321
322 /*
323  * PortalSetResultFormat
324  *              Select the format codes for a portal's output.
325  *
326  * This must be run after PortalStart for a portal that will be read by
327  * a Remote or RemoteExecute destination.  It is not presently needed for
328  * other destination types.
329  *
330  * formats[] is the client format request, as per Bind message conventions.
331  */
332 void
333 PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
334 {
335         int                     natts;
336         int                     i;
337
338         /* Do nothing if portal won't return tuples */
339         if (portal->tupDesc == NULL)
340                 return;
341         natts = portal->tupDesc->natts;
342         /* +1 avoids palloc(0) if no columns */
343         portal->formats = (int16 *)
344                 MemoryContextAlloc(PortalGetHeapMemory(portal),
345                                                    (natts + 1) * sizeof(int16));
346         if (nFormats > 1)
347         {
348                 /* format specified for each column */
349                 if (nFormats != natts)
350                         ereport(ERROR,
351                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
352                                          errmsg("bind message has %d result formats but query has %d columns",
353                                                         nFormats, natts)));
354                 memcpy(portal->formats, formats, natts * sizeof(int16));
355         }
356         else if (nFormats > 0)
357         {
358                 /* single format specified, use for all columns */
359                 int16           format1 = formats[0];
360
361                 for (i = 0; i < natts; i++)
362                         portal->formats[i] = format1;
363         }
364         else
365         {
366                 /* use default format for all columns */
367                 for (i = 0; i < natts; i++)
368                         portal->formats[i] = 0;
369         }
370 }
371
372 /*
373  * PortalRun
374  *              Run a portal's query or queries.
375  *
376  * count <= 0 is interpreted as a no-op: the destination gets started up
377  * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
378  * interpreted as "all rows".  Note that count is ignored in multi-query
379  * situations, where we always run the portal to completion.
380  *
381  * dest: where to send output of primary (canSetTag) query
382  *
383  * altdest: where to send output of non-primary queries
384  *
385  * completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
386  *              in which to store a command completion status string.
387  *              May be NULL if caller doesn't want a status string.
388  *
389  * Returns TRUE if the portal's execution is complete, FALSE if it was
390  * suspended due to exhaustion of the count parameter.
391  */
392 bool
393 PortalRun(Portal portal, long count,
394                   DestReceiver *dest, DestReceiver *altdest,
395                   char *completionTag)
396 {
397         bool            result;
398         MemoryContext savePortalContext;
399         MemoryContext saveQueryContext;
400         MemoryContext oldContext;
401
402         AssertArg(PortalIsValid(portal));
403         AssertState(portal->portalReady);       /* else no PortalStart */
404
405         /* Initialize completion tag to empty string */
406         if (completionTag)
407                 completionTag[0] = '\0';
408
409         /*
410          * Check for improper portal use, and mark portal active.
411          */
412         if (portal->portalDone)
413                 ereport(ERROR,
414                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
415                    errmsg("portal \"%s\" cannot be run anymore", portal->name)));
416         if (portal->portalActive)
417                 ereport(ERROR,
418                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
419                                  errmsg("portal \"%s\" already active", portal->name)));
420         portal->portalActive = true;
421
422         /*
423          * Set global portal context pointers.
424          */
425         savePortalContext = PortalContext;
426         PortalContext = PortalGetHeapMemory(portal);
427         saveQueryContext = QueryContext;
428         QueryContext = portal->queryContext;
429
430         oldContext = MemoryContextSwitchTo(PortalContext);
431
432         switch (portal->strategy)
433         {
434                 case PORTAL_ONE_SELECT:
435                         (void) PortalRunSelect(portal, true, count, dest);
436                         /* we know the query is supposed to set the tag */
437                         if (completionTag && portal->commandTag)
438                                 strcpy(completionTag, portal->commandTag);
439
440                         /*
441                          * Since it's a forward fetch, say DONE iff atEnd is now true.
442                          */
443                         result = portal->atEnd;
444                         break;
445
446                 case PORTAL_UTIL_SELECT:
447
448                         /*
449                          * If we have not yet run the utility statement, do so,
450                          * storing its results in the portal's tuplestore.
451                          */
452                         if (!portal->portalUtilReady)
453                         {
454                                 DestReceiver *treceiver;
455
456                                 PortalCreateHoldStore(portal);
457                                 treceiver = CreateDestReceiver(Tuplestore, portal);
458                                 PortalRunUtility(portal, lfirst(portal->parseTrees),
459                                                                  treceiver, NULL);
460                                 (*treceiver->destroy) (treceiver);
461                                 portal->portalUtilReady = true;
462                         }
463
464                         /*
465                          * Now fetch desired portion of results.
466                          */
467                         (void) PortalRunSelect(portal, true, count, dest);
468
469                         /*
470                          * We know the query is supposed to set the tag; we assume
471                          * only the default tag is needed.
472                          */
473                         if (completionTag && portal->commandTag)
474                                 strcpy(completionTag, portal->commandTag);
475
476                         /*
477                          * Since it's a forward fetch, say DONE iff atEnd is now true.
478                          */
479                         result = portal->atEnd;
480                         break;
481
482                 case PORTAL_MULTI_QUERY:
483                         PortalRunMulti(portal, dest, altdest, completionTag);
484                         /* Always complete at end of RunMulti */
485                         result = true;
486                         break;
487
488                 default:
489                         elog(ERROR, "unrecognized portal strategy: %d",
490                                  (int) portal->strategy);
491                         result = false;         /* keep compiler quiet */
492                         break;
493         }
494
495         MemoryContextSwitchTo(oldContext);
496
497         /* Mark portal not active */
498         portal->portalActive = false;
499
500         PortalContext = savePortalContext;
501         QueryContext = saveQueryContext;
502
503         return result;
504 }
505
506 /*
507  * PortalRunSelect
508  *              Execute a portal's query in SELECT cases (also UTIL_SELECT).
509  *
510  * This handles simple N-rows-forward-or-backward cases.  For more complex
511  * nonsequential access to a portal, see PortalRunFetch.
512  *
513  * count <= 0 is interpreted as a no-op: the destination gets started up
514  * and shut down, but nothing else happens.  Also, count == FETCH_ALL is
515  * interpreted as "all rows".
516  *
517  * Caller must already have validated the Portal and done appropriate
518  * setup (cf. PortalRun).
519  *
520  * Returns number of rows processed (suitable for use in result tag)
521  */
522 static long
523 PortalRunSelect(Portal portal,
524                                 bool forward,
525                                 long count,
526                                 DestReceiver *dest)
527 {
528         QueryDesc  *queryDesc;
529         ScanDirection direction;
530         uint32          nprocessed;
531
532         /*
533          * NB: queryDesc will be NULL if we are fetching from a held cursor or
534          * a completed utility query; can't use it in that path.
535          */
536         queryDesc = PortalGetQueryDesc(portal);
537
538         /* Caller messed up if we have neither a ready query nor held data. */
539         Assert(queryDesc || portal->holdStore);
540
541         /*
542          * Force the queryDesc destination to the right thing.  This supports
543          * MOVE, for example, which will pass in dest = None.  This is okay to
544          * change as long as we do it on every fetch.  (The Executor must not
545          * assume that dest never changes.)
546          */
547         if (queryDesc)
548                 queryDesc->dest = dest;
549
550         /*
551          * Determine which direction to go in, and check to see if we're
552          * already at the end of the available tuples in that direction.  If
553          * so, set the direction to NoMovement to avoid trying to fetch any
554          * tuples.      (This check exists because not all plan node types are
555          * robust about being called again if they've already returned NULL
556          * once.)  Then call the executor (we must not skip this, because the
557          * destination needs to see a setup and shutdown even if no tuples are
558          * available).  Finally, update the portal position state depending on
559          * the number of tuples that were retrieved.
560          */
561         if (forward)
562         {
563                 if (portal->atEnd || count <= 0)
564                         direction = NoMovementScanDirection;
565                 else
566                         direction = ForwardScanDirection;
567
568                 /* In the executor, zero count processes all rows */
569                 if (count == FETCH_ALL)
570                         count = 0;
571
572                 if (portal->holdStore)
573                         nprocessed = RunFromStore(portal, direction, count, dest);
574                 else
575                 {
576                         ExecutorRun(queryDesc, direction, count);
577                         nprocessed = queryDesc->estate->es_processed;
578                 }
579
580                 if (direction != NoMovementScanDirection)
581                 {
582                         long            oldPos;
583
584                         if (nprocessed > 0)
585                                 portal->atStart = false;                /* OK to go backward now */
586                         if (count == 0 ||
587                                 (unsigned long) nprocessed < (unsigned long) count)
588                                 portal->atEnd = true;   /* we retrieved 'em all */
589                         oldPos = portal->portalPos;
590                         portal->portalPos += nprocessed;
591                         /* portalPos doesn't advance when we fall off the end */
592                         if (portal->portalPos < oldPos)
593                                 portal->posOverflow = true;
594                 }
595         }
596         else
597         {
598                 if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
599                         ereport(ERROR,
600                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
601                                          errmsg("cursor can only scan forward"),
602                                          errhint("Declare it with SCROLL option to enable backward scan.")));
603
604                 if (portal->atStart || count <= 0)
605                         direction = NoMovementScanDirection;
606                 else
607                         direction = BackwardScanDirection;
608
609                 /* In the executor, zero count processes all rows */
610                 if (count == FETCH_ALL)
611                         count = 0;
612
613                 if (portal->holdStore)
614                         nprocessed = RunFromStore(portal, direction, count, dest);
615                 else
616                 {
617                         ExecutorRun(queryDesc, direction, count);
618                         nprocessed = queryDesc->estate->es_processed;
619                 }
620
621                 if (direction != NoMovementScanDirection)
622                 {
623                         if (nprocessed > 0 && portal->atEnd)
624                         {
625                                 portal->atEnd = false;  /* OK to go forward now */
626                                 portal->portalPos++;    /* adjust for endpoint case */
627                         }
628                         if (count == 0 ||
629                                 (unsigned long) nprocessed < (unsigned long) count)
630                         {
631                                 portal->atStart = true; /* we retrieved 'em all */
632                                 portal->portalPos = 0;
633                                 portal->posOverflow = false;
634                         }
635                         else
636                         {
637                                 long            oldPos;
638
639                                 oldPos = portal->portalPos;
640                                 portal->portalPos -= nprocessed;
641                                 if (portal->portalPos > oldPos ||
642                                         portal->portalPos <= 0)
643                                         portal->posOverflow = true;
644                         }
645                 }
646         }
647
648         return nprocessed;
649 }
650
651 /*
652  * RunFromStore
653  *              Fetch tuples from the portal's tuple store.
654  *
655  * Calling conventions are similar to ExecutorRun, except that we
656  * do not depend on having a queryDesc or estate.  Therefore we return the
657  * number of tuples processed as the result, not in estate->es_processed.
658  *
659  * One difference from ExecutorRun is that the destination receiver functions
660  * are run in the caller's memory context (since we have no estate).  Watch
661  * out for memory leaks.
662  */
663 static uint32
664 RunFromStore(Portal portal, ScanDirection direction, long count,
665                          DestReceiver *dest)
666 {
667         long            current_tuple_count = 0;
668
669         (*dest->startup) (dest, CMD_SELECT, portal->tupDesc);
670
671         if (direction == NoMovementScanDirection)
672         {
673                 /* do nothing except start/stop the destination */
674         }
675         else
676         {
677                 bool            forward = (direction == ForwardScanDirection);
678
679                 for (;;)
680                 {
681                         MemoryContext oldcontext;
682                         HeapTuple       tup;
683                         bool            should_free;
684
685                         oldcontext = MemoryContextSwitchTo(portal->holdContext);
686
687                         tup = tuplestore_getheaptuple(portal->holdStore, forward,
688                                                                                   &should_free);
689
690                         MemoryContextSwitchTo(oldcontext);
691
692                         if (tup == NULL)
693                                 break;
694
695                         (*dest->receiveTuple) (tup, portal->tupDesc, dest);
696
697                         if (should_free)
698                                 pfree(tup);
699
700                         /*
701                          * check our tuple count.. if we've processed the proper
702                          * number then quit, else loop again and process more tuples.
703                          * Zero count means no limit.
704                          */
705                         current_tuple_count++;
706                         if (count && count == current_tuple_count)
707                                 break;
708                 }
709         }
710
711         (*dest->shutdown) (dest);
712
713         return (uint32) current_tuple_count;
714 }
715
716 /*
717  * PortalRunUtility
718  *              Execute a utility statement inside a portal.
719  */
720 static void
721 PortalRunUtility(Portal portal, Query *query,
722                                  DestReceiver *dest, char *completionTag)
723 {
724         Node       *utilityStmt = query->utilityStmt;
725
726         elog(DEBUG3, "ProcessUtility");
727
728         /*
729          * Set snapshot if utility stmt needs one.      Most reliable way to do
730          * this seems to be to enumerate those that do not need one; this is a
731          * short list.  Transaction control, LOCK, and SET must *not* set a
732          * snapshot since they need to be executable at the start of a
733          * serializable transaction without freezing a snapshot.  By extension
734          * we allow SHOW not to set a snapshot.  The other stmts listed are
735          * just efficiency hacks.  Beware of listing anything that can modify
736          * the database --- if, say, it has to update an index with
737          * expressions that invoke user-defined functions, then it had better
738          * have a snapshot.
739          */
740         if (!(IsA(utilityStmt, TransactionStmt) ||
741                   IsA(utilityStmt, LockStmt) ||
742                   IsA(utilityStmt, VariableSetStmt) ||
743                   IsA(utilityStmt, VariableShowStmt) ||
744                   IsA(utilityStmt, VariableResetStmt) ||
745                   IsA(utilityStmt, ConstraintsSetStmt) ||
746         /* efficiency hacks from here down */
747                   IsA(utilityStmt, FetchStmt) ||
748                   IsA(utilityStmt, ListenStmt) ||
749                   IsA(utilityStmt, NotifyStmt) ||
750                   IsA(utilityStmt, UnlistenStmt) ||
751                   IsA(utilityStmt, CheckPointStmt)))
752                 SetQuerySnapshot();
753
754         if (query->canSetTag)
755         {
756                 /* utility statement can override default tag string */
757                 ProcessUtility(utilityStmt, dest, completionTag);
758                 if (completionTag && completionTag[0] == '\0' && portal->commandTag)
759                         strcpy(completionTag, portal->commandTag);      /* use the default */
760         }
761         else
762         {
763                 /* utility added by rewrite cannot set tag */
764                 ProcessUtility(utilityStmt, dest, NULL);
765         }
766
767         /* Some utility statements may change context on us */
768         MemoryContextSwitchTo(PortalGetHeapMemory(portal));
769 }
770
771 /*
772  * PortalRunMulti
773  *              Execute a portal's queries in the general case (multi queries).
774  */
775 static void
776 PortalRunMulti(Portal portal,
777                            DestReceiver *dest, DestReceiver *altdest,
778                            char *completionTag)
779 {
780         List       *plantree_list = portal->planTrees;
781         List       *querylist_item;
782
783         /*
784          * If the destination is RemoteExecute, change to None.  The reason is
785          * that the client won't be expecting any tuples, and indeed has no
786          * way to know what they are, since there is no provision for Describe
787          * to send a RowDescription message when this portal execution
788          * strategy is in effect.  This presently will only affect SELECT
789          * commands added to non-SELECT queries by rewrite rules: such
790          * commands will be executed, but the results will be discarded unless
791          * you use "simple Query" protocol.
792          */
793         if (dest->mydest == RemoteExecute)
794                 dest = None_Receiver;
795         if (altdest->mydest == RemoteExecute)
796                 altdest = None_Receiver;
797
798         /*
799          * Loop to handle the individual queries generated from a single
800          * parsetree by analysis and rewrite.
801          */
802         foreach(querylist_item, portal->parseTrees)
803         {
804                 Query      *query = (Query *) lfirst(querylist_item);
805                 Plan       *plan = (Plan *) lfirst(plantree_list);
806
807                 plantree_list = lnext(plantree_list);
808
809                 /*
810                  * If we got a cancel signal in prior command, quit
811                  */
812                 CHECK_FOR_INTERRUPTS();
813
814                 if (query->commandType == CMD_UTILITY)
815                 {
816                         /*
817                          * process utility functions (create, destroy, etc..)
818                          */
819                         Assert(plan == NULL);
820
821                         PortalRunUtility(portal, query,
822                                                          query->canSetTag ? dest : altdest,
823                                                          completionTag);
824                 }
825                 else
826                 {
827                         /*
828                          * process a plannable query.
829                          */
830                         elog(DEBUG3, "ProcessQuery");
831
832                         /* Must always set snapshot for plannable queries */
833                         SetQuerySnapshot();
834
835                         /*
836                          * execute the plan
837                          */
838                         if (log_executor_stats)
839                                 ResetUsage();
840
841                         if (query->canSetTag)
842                         {
843                                 /* statement can set tag string */
844                                 ProcessQuery(query, plan,
845                                                          portal->portalParams,
846                                                          dest, completionTag);
847                         }
848                         else
849                         {
850                                 /* stmt added by rewrite cannot set tag */
851                                 ProcessQuery(query, plan,
852                                                          portal->portalParams,
853                                                          altdest, NULL);
854                         }
855
856                         if (log_executor_stats)
857                                 ShowUsage("EXECUTOR STATISTICS");
858                 }
859
860                 /*
861                  * Increment command counter between queries, but not after the
862                  * last one.
863                  */
864                 if (plantree_list != NIL)
865                         CommandCounterIncrement();
866
867                 /*
868                  * Clear subsidiary contexts to recover temporary memory.
869                  */
870                 Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext);
871
872                 MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
873         }
874
875         /*
876          * If a command completion tag was supplied, use it.  Otherwise use
877          * the portal's commandTag as the default completion tag.
878          *
879          * Exception: clients will expect INSERT/UPDATE/DELETE tags to have
880          * counts, so fake something up if necessary.  (This could happen if
881          * the original query was replaced by a DO INSTEAD rule.)
882          */
883         if (completionTag && completionTag[0] == '\0')
884         {
885                 if (portal->commandTag)
886                         strcpy(completionTag, portal->commandTag);
887                 if (strcmp(completionTag, "INSERT") == 0)
888                         strcpy(completionTag, "INSERT 0 0");
889                 else if (strcmp(completionTag, "UPDATE") == 0)
890                         strcpy(completionTag, "UPDATE 0");
891                 else if (strcmp(completionTag, "DELETE") == 0)
892                         strcpy(completionTag, "DELETE 0");
893         }
894
895         /* Prevent portal's commands from being re-executed */
896         portal->portalDone = true;
897 }
898
899 /*
900  * PortalRunFetch
901  *              Variant form of PortalRun that supports SQL FETCH directions.
902  *
903  * Returns number of rows processed (suitable for use in result tag)
904  */
905 long
906 PortalRunFetch(Portal portal,
907                            FetchDirection fdirection,
908                            long count,
909                            DestReceiver *dest)
910 {
911         long            result;
912         MemoryContext savePortalContext;
913         MemoryContext saveQueryContext;
914         MemoryContext oldContext;
915
916         AssertArg(PortalIsValid(portal));
917         AssertState(portal->portalReady);       /* else no PortalStart */
918
919         /*
920          * Check for improper portal use, and mark portal active.
921          */
922         if (portal->portalDone)
923                 ereport(ERROR,
924                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
925                    errmsg("portal \"%s\" cannot be run anymore", portal->name)));
926         if (portal->portalActive)
927                 ereport(ERROR,
928                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
929                                  errmsg("portal \"%s\" already active", portal->name)));
930         portal->portalActive = true;
931
932         /*
933          * Set global portal context pointers.
934          */
935         savePortalContext = PortalContext;
936         PortalContext = PortalGetHeapMemory(portal);
937         saveQueryContext = QueryContext;
938         QueryContext = portal->queryContext;
939
940         oldContext = MemoryContextSwitchTo(PortalContext);
941
942         switch (portal->strategy)
943         {
944                 case PORTAL_ONE_SELECT:
945                         result = DoPortalRunFetch(portal, fdirection, count, dest);
946                         break;
947
948                 default:
949                         elog(ERROR, "unsupported portal strategy");
950                         result = 0;                     /* keep compiler quiet */
951                         break;
952         }
953
954         MemoryContextSwitchTo(oldContext);
955
956         /* Mark portal not active */
957         portal->portalActive = false;
958
959         PortalContext = savePortalContext;
960         QueryContext = saveQueryContext;
961
962         return result;
963 }
964
965 /*
966  * DoPortalRunFetch
967  *              Guts of PortalRunFetch --- the portal context is already set up
968  *
969  * Returns number of rows processed (suitable for use in result tag)
970  */
971 static long
972 DoPortalRunFetch(Portal portal,
973                                  FetchDirection fdirection,
974                                  long count,
975                                  DestReceiver *dest)
976 {
977         bool            forward;
978
979         Assert(portal->strategy == PORTAL_ONE_SELECT);
980
981         switch (fdirection)
982         {
983                 case FETCH_FORWARD:
984                         if (count < 0)
985                         {
986                                 fdirection = FETCH_BACKWARD;
987                                 count = -count;
988                         }
989                         /* fall out of switch to share code with FETCH_BACKWARD */
990                         break;
991                 case FETCH_BACKWARD:
992                         if (count < 0)
993                         {
994                                 fdirection = FETCH_FORWARD;
995                                 count = -count;
996                         }
997                         /* fall out of switch to share code with FETCH_FORWARD */
998                         break;
999                 case FETCH_ABSOLUTE:
1000                         if (count > 0)
1001                         {
1002                                 /*
1003                                  * Definition: Rewind to start, advance count-1 rows,
1004                                  * return next row (if any).  In practice, if the goal is
1005                                  * less than halfway back to the start, it's better to
1006                                  * scan from where we are.      In any case, we arrange to
1007                                  * fetch the target row going forwards.
1008                                  */
1009                                 if (portal->posOverflow || portal->portalPos == LONG_MAX ||
1010                                         count - 1 <= portal->portalPos / 2)
1011                                 {
1012                                         DoPortalRewind(portal);
1013                                         if (count > 1)
1014                                                 PortalRunSelect(portal, true, count - 1,
1015                                                                                 None_Receiver);
1016                                 }
1017                                 else
1018                                 {
1019                                         long            pos = portal->portalPos;
1020
1021                                         if (portal->atEnd)
1022                                                 pos++;  /* need one extra fetch if off end */
1023                                         if (count <= pos)
1024                                                 PortalRunSelect(portal, false, pos - count + 1,
1025                                                                                 None_Receiver);
1026                                         else if (count > pos + 1)
1027                                                 PortalRunSelect(portal, true, count - pos - 1,
1028                                                                                 None_Receiver);
1029                                 }
1030                                 return PortalRunSelect(portal, true, 1L, dest);
1031                         }
1032                         else if (count < 0)
1033                         {
1034                                 /*
1035                                  * Definition: Advance to end, back up abs(count)-1 rows,
1036                                  * return prior row (if any).  We could optimize this if
1037                                  * we knew in advance where the end was, but typically we
1038                                  * won't. (Is it worth considering case where count > half
1039                                  * of size of query?  We could rewind once we know the
1040                                  * size ...)
1041                                  */
1042                                 PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
1043                                 if (count < -1)
1044                                         PortalRunSelect(portal, false, -count - 1, None_Receiver);
1045                                 return PortalRunSelect(portal, false, 1L, dest);
1046                         }
1047                         else
1048 /* count == 0 */
1049                         {
1050                                 /* Rewind to start, return zero rows */
1051                                 DoPortalRewind(portal);
1052                                 return PortalRunSelect(portal, true, 0L, dest);
1053                         }
1054                         break;
1055                 case FETCH_RELATIVE:
1056                         if (count > 0)
1057                         {
1058                                 /*
1059                                  * Definition: advance count-1 rows, return next row (if
1060                                  * any).
1061                                  */
1062                                 if (count > 1)
1063                                         PortalRunSelect(portal, true, count - 1, None_Receiver);
1064                                 return PortalRunSelect(portal, true, 1L, dest);
1065                         }
1066                         else if (count < 0)
1067                         {
1068                                 /*
1069                                  * Definition: back up abs(count)-1 rows, return prior row
1070                                  * (if any).
1071                                  */
1072                                 if (count < -1)
1073                                         PortalRunSelect(portal, false, -count - 1, None_Receiver);
1074                                 return PortalRunSelect(portal, false, 1L, dest);
1075                         }
1076                         else
1077 /* count == 0 */
1078                         {
1079                                 /* Same as FETCH FORWARD 0, so fall out of switch */
1080                                 fdirection = FETCH_FORWARD;
1081                         }
1082                         break;
1083                 default:
1084                         elog(ERROR, "bogus direction");
1085                         break;
1086         }
1087
1088         /*
1089          * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, and
1090          * count >= 0.
1091          */
1092         forward = (fdirection == FETCH_FORWARD);
1093
1094         /*
1095          * Zero count means to re-fetch the current row, if any (per SQL92)
1096          */
1097         if (count == 0)
1098         {
1099                 bool            on_row;
1100
1101                 /* Are we sitting on a row? */
1102                 on_row = (!portal->atStart && !portal->atEnd);
1103
1104                 if (dest->mydest == None)
1105                 {
1106                         /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */
1107                         return on_row ? 1L : 0L;
1108                 }
1109                 else
1110                 {
1111                         /*
1112                          * If we are sitting on a row, back up one so we can re-fetch
1113                          * it. If we are not sitting on a row, we still have to start
1114                          * up and shut down the executor so that the destination is
1115                          * initialized and shut down correctly; so keep going.  To
1116                          * PortalRunSelect, count == 0 means we will retrieve no row.
1117                          */
1118                         if (on_row)
1119                         {
1120                                 PortalRunSelect(portal, false, 1L, None_Receiver);
1121                                 /* Set up to fetch one row forward */
1122                                 count = 1;
1123                                 forward = true;
1124                         }
1125                 }
1126         }
1127
1128         /*
1129          * Optimize MOVE BACKWARD ALL into a Rewind.
1130          */
1131         if (!forward && count == FETCH_ALL && dest->mydest == None)
1132         {
1133                 long            result = portal->portalPos;
1134
1135                 if (result > 0 && !portal->atEnd)
1136                         result--;
1137                 DoPortalRewind(portal);
1138                 /* result is bogus if pos had overflowed, but it's best we can do */
1139                 return result;
1140         }
1141
1142         return PortalRunSelect(portal, forward, count, dest);
1143 }
1144
1145 /*
1146  * DoPortalRewind - rewind a Portal to starting point
1147  */
1148 static void
1149 DoPortalRewind(Portal portal)
1150 {
1151         if (portal->holdStore)
1152         {
1153                 MemoryContext oldcontext;
1154
1155                 oldcontext = MemoryContextSwitchTo(portal->holdContext);
1156                 tuplestore_rescan(portal->holdStore);
1157                 MemoryContextSwitchTo(oldcontext);
1158         }
1159         if (PortalGetQueryDesc(portal))
1160                 ExecutorRewind(PortalGetQueryDesc(portal));
1161
1162         portal->atStart = true;
1163         portal->atEnd = false;
1164         portal->portalPos = 0;
1165         portal->posOverflow = false;
1166 }