OSDN Git Service

RESET SESSION, plus related new DDL commands. Patch from Marko Kreen,
[pg-rex/syncrep.git] / src / backend / commands / portalcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * portalcmds.c
4  *        Utility commands affecting portals (that is, SQL cursor commands)
5  *
6  * Note: see also tcop/pquery.c, which implements portal operations for
7  * the FE/BE protocol.  This module uses pquery.c for some operations.
8  * And both modules depend on utils/mmgr/portalmem.c, which controls
9  * storage management for portals (but doesn't run any queries in them).
10  *
11  *
12  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/commands/portalcmds.c,v 1.63 2007/04/12 06:53:46 neilc Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <limits.h>
25
26 #include "access/xact.h"
27 #include "commands/portalcmds.h"
28 #include "executor/executor.h"
29 #include "optimizer/planner.h"
30 #include "rewrite/rewriteHandler.h"
31 #include "tcop/pquery.h"
32 #include "tcop/tcopprot.h"
33 #include "utils/memutils.h"
34
35
36 /*
37  * PerformCursorOpen
38  *              Execute SQL DECLARE CURSOR command.
39  */
40 void
41 PerformCursorOpen(DeclareCursorStmt *stmt, ParamListInfo params,
42                                   const char *queryString, bool isTopLevel)
43 {
44         Oid                *param_types;
45         int                     num_params;
46         List       *rewritten;
47         Query      *query;
48         PlannedStmt *plan;
49         Portal          portal;
50         MemoryContext oldContext;
51
52         /*
53          * Disallow empty-string cursor name (conflicts with protocol-level
54          * unnamed portal).
55          */
56         if (!stmt->portalname || stmt->portalname[0] == '\0')
57                 ereport(ERROR,
58                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
59                                  errmsg("invalid cursor name: must not be empty")));
60
61         /*
62          * If this is a non-holdable cursor, we require that this statement has
63          * been executed inside a transaction block (or else, it would have no
64          * user-visible effect).
65          */
66         if (!(stmt->options & CURSOR_OPT_HOLD))
67                 RequireTransactionChain(isTopLevel, "DECLARE CURSOR");
68
69         /*
70          * Don't allow both SCROLL and NO SCROLL to be specified
71          */
72         if ((stmt->options & CURSOR_OPT_SCROLL) &&
73                 (stmt->options & CURSOR_OPT_NO_SCROLL))
74                 ereport(ERROR,
75                                 (errcode(ERRCODE_INVALID_CURSOR_DEFINITION),
76                                  errmsg("cannot specify both SCROLL and NO SCROLL")));
77
78         /* Convert parameter type data to the form parser wants */
79         getParamListTypes(params, &param_types, &num_params);
80
81         /*
82          * Run parse analysis and rewrite.  Note this also acquires sufficient
83          * locks on the source table(s).
84          *
85          * Because the parser and planner tend to scribble on their input, we
86          * make a preliminary copy of the source querytree.  This prevents
87          * problems in the case that the DECLARE CURSOR is in a portal or plpgsql
88          * function and is executed repeatedly.  (See also the same hack in
89          * COPY and PREPARE.)  XXX FIXME someday.
90          */
91         rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query),
92                                                                            queryString, param_types, num_params);
93
94         /* We don't expect more or less than one result query */
95         if (list_length(rewritten) != 1 || !IsA(linitial(rewritten), Query))
96                 elog(ERROR, "unexpected rewrite result");
97         query = (Query *) linitial(rewritten);
98         if (query->commandType != CMD_SELECT)
99                 elog(ERROR, "unexpected rewrite result");
100
101         /* But we must explicitly disallow DECLARE CURSOR ... SELECT INTO */
102         if (query->into)
103                 ereport(ERROR,
104                                 (errcode(ERRCODE_INVALID_CURSOR_DEFINITION),
105                                  errmsg("DECLARE CURSOR cannot specify INTO")));
106
107         if (query->rowMarks != NIL)
108                 ereport(ERROR,
109                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
110                           errmsg("DECLARE CURSOR ... FOR UPDATE/SHARE is not supported"),
111                                  errdetail("Cursors must be READ ONLY.")));
112
113         /* plan the query */
114         plan = planner(query, true, stmt->options, params);
115
116         /*
117          * Create a portal and copy the plan into its memory context.
118          */
119         portal = CreatePortal(stmt->portalname, false, false);
120
121         oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
122
123         plan = copyObject(plan);
124
125         PortalDefineQuery(portal,
126                                           NULL,
127                                           queryString,
128                                           "SELECT", /* cursor's query is always a SELECT */
129                                           list_make1(plan),
130                                           NULL);
131
132         /*----------
133          * Also copy the outer portal's parameter list into the inner portal's
134          * memory context.      We want to pass down the parameter values in case we
135          * had a command like
136          *              DECLARE c CURSOR FOR SELECT ... WHERE foo = $1
137          * This will have been parsed using the outer parameter set and the
138          * parameter value needs to be preserved for use when the cursor is
139          * executed.
140          *----------
141          */
142         params = copyParamList(params);
143
144         MemoryContextSwitchTo(oldContext);
145
146         /*
147          * Set up options for portal.
148          *
149          * If the user didn't specify a SCROLL type, allow or disallow scrolling
150          * based on whether it would require any additional runtime overhead to do
151          * so.
152          */
153         portal->cursorOptions = stmt->options;
154         if (!(portal->cursorOptions & (CURSOR_OPT_SCROLL | CURSOR_OPT_NO_SCROLL)))
155         {
156                 if (ExecSupportsBackwardScan(plan->planTree))
157                         portal->cursorOptions |= CURSOR_OPT_SCROLL;
158                 else
159                         portal->cursorOptions |= CURSOR_OPT_NO_SCROLL;
160         }
161
162         /*
163          * Start execution, inserting parameters if any.
164          */
165         PortalStart(portal, params, ActiveSnapshot);
166
167         Assert(portal->strategy == PORTAL_ONE_SELECT);
168
169         /*
170          * We're done; the query won't actually be run until PerformPortalFetch is
171          * called.
172          */
173 }
174
175 /*
176  * PerformPortalFetch
177  *              Execute SQL FETCH or MOVE command.
178  *
179  *      stmt: parsetree node for command
180  *      dest: where to send results
181  *      completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
182  *              in which to store a command completion status string.
183  *
184  * completionTag may be NULL if caller doesn't want a status string.
185  */
186 void
187 PerformPortalFetch(FetchStmt *stmt,
188                                    DestReceiver *dest,
189                                    char *completionTag)
190 {
191         Portal          portal;
192         long            nprocessed;
193
194         /*
195          * Disallow empty-string cursor name (conflicts with protocol-level
196          * unnamed portal).
197          */
198         if (!stmt->portalname || stmt->portalname[0] == '\0')
199                 ereport(ERROR,
200                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
201                                  errmsg("invalid cursor name: must not be empty")));
202
203         /* get the portal from the portal name */
204         portal = GetPortalByName(stmt->portalname);
205         if (!PortalIsValid(portal))
206         {
207                 ereport(ERROR,
208                                 (errcode(ERRCODE_UNDEFINED_CURSOR),
209                                  errmsg("cursor \"%s\" does not exist", stmt->portalname)));
210                 return;                                 /* keep compiler happy */
211         }
212
213         /* Adjust dest if needed.  MOVE wants destination DestNone */
214         if (stmt->ismove)
215                 dest = None_Receiver;
216
217         /* Do it */
218         nprocessed = PortalRunFetch(portal,
219                                                                 stmt->direction,
220                                                                 stmt->howMany,
221                                                                 dest);
222
223         /* Return command status if wanted */
224         if (completionTag)
225                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %ld",
226                                  stmt->ismove ? "MOVE" : "FETCH",
227                                  nprocessed);
228 }
229
230 /*
231  * PerformPortalClose
232  *              Close a cursor.
233  */
234 void
235 PerformPortalClose(const char *name)
236 {
237         Portal          portal;
238
239         /* NULL means CLOSE ALL */
240         if (name == NULL)
241         {
242                 PortalHashTableDeleteAll();
243                 return;
244         }
245
246         /*
247          * Disallow empty-string cursor name (conflicts with protocol-level
248          * unnamed portal).
249          */
250         if (name[0] == '\0')
251                 ereport(ERROR,
252                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
253                                  errmsg("invalid cursor name: must not be empty")));
254
255         /*
256          * get the portal from the portal name
257          */
258         portal = GetPortalByName(name);
259         if (!PortalIsValid(portal))
260         {
261                 ereport(ERROR,
262                                 (errcode(ERRCODE_UNDEFINED_CURSOR),
263                                  errmsg("cursor \"%s\" does not exist", name)));
264                 return;                                 /* keep compiler happy */
265         }
266
267         /*
268          * Note: PortalCleanup is called as a side-effect
269          */
270         PortalDrop(portal, false);
271 }
272
273 /*
274  * PortalCleanup
275  *
276  * Clean up a portal when it's dropped.  This is the standard cleanup hook
277  * for portals.
278  */
279 void
280 PortalCleanup(Portal portal)
281 {
282         QueryDesc  *queryDesc;
283
284         /*
285          * sanity checks
286          */
287         AssertArg(PortalIsValid(portal));
288         AssertArg(portal->cleanup == PortalCleanup);
289
290         /*
291          * Shut down executor, if still running.  We skip this during error abort,
292          * since other mechanisms will take care of releasing executor resources,
293          * and we can't be sure that ExecutorEnd itself wouldn't fail.
294          */
295         queryDesc = PortalGetQueryDesc(portal);
296         if (queryDesc)
297         {
298                 portal->queryDesc = NULL;
299                 if (portal->status != PORTAL_FAILED)
300                 {
301                         ResourceOwner saveResourceOwner;
302
303                         /* We must make the portal's resource owner current */
304                         saveResourceOwner = CurrentResourceOwner;
305                         PG_TRY();
306                         {
307                                 CurrentResourceOwner = portal->resowner;
308                                 /* we do not need AfterTriggerEndQuery() here */
309                                 ExecutorEnd(queryDesc);
310                         }
311                         PG_CATCH();
312                         {
313                                 /* Ensure CurrentResourceOwner is restored on error */
314                                 CurrentResourceOwner = saveResourceOwner;
315                                 PG_RE_THROW();
316                         }
317                         PG_END_TRY();
318                         CurrentResourceOwner = saveResourceOwner;
319                 }
320         }
321 }
322
323 /*
324  * PersistHoldablePortal
325  *
326  * Prepare the specified Portal for access outside of the current
327  * transaction. When this function returns, all future accesses to the
328  * portal must be done via the Tuplestore (not by invoking the
329  * executor).
330  */
331 void
332 PersistHoldablePortal(Portal portal)
333 {
334         QueryDesc  *queryDesc = PortalGetQueryDesc(portal);
335         Portal          saveActivePortal;
336         Snapshot        saveActiveSnapshot;
337         ResourceOwner saveResourceOwner;
338         MemoryContext savePortalContext;
339         MemoryContext oldcxt;
340
341         /*
342          * If we're preserving a holdable portal, we had better be inside the
343          * transaction that originally created it.
344          */
345         Assert(portal->createSubid != InvalidSubTransactionId);
346         Assert(queryDesc != NULL);
347
348         /*
349          * Caller must have created the tuplestore already.
350          */
351         Assert(portal->holdContext != NULL);
352         Assert(portal->holdStore != NULL);
353
354         /*
355          * Before closing down the executor, we must copy the tupdesc into
356          * long-term memory, since it was created in executor memory.
357          */
358         oldcxt = MemoryContextSwitchTo(portal->holdContext);
359
360         portal->tupDesc = CreateTupleDescCopy(portal->tupDesc);
361
362         MemoryContextSwitchTo(oldcxt);
363
364         /*
365          * Check for improper portal use, and mark portal active.
366          */
367         if (portal->status != PORTAL_READY)
368                 ereport(ERROR,
369                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
370                                  errmsg("portal \"%s\" cannot be run", portal->name)));
371         portal->status = PORTAL_ACTIVE;
372
373         /*
374          * Set up global portal context pointers.
375          */
376         saveActivePortal = ActivePortal;
377         saveActiveSnapshot = ActiveSnapshot;
378         saveResourceOwner = CurrentResourceOwner;
379         savePortalContext = PortalContext;
380         PG_TRY();
381         {
382                 ActivePortal = portal;
383                 ActiveSnapshot = queryDesc->snapshot;
384                 CurrentResourceOwner = portal->resowner;
385                 PortalContext = PortalGetHeapMemory(portal);
386
387                 MemoryContextSwitchTo(PortalContext);
388
389                 /*
390                  * Rewind the executor: we need to store the entire result set in the
391                  * tuplestore, so that subsequent backward FETCHs can be processed.
392                  */
393                 ExecutorRewind(queryDesc);
394
395                 /* Change the destination to output to the tuplestore */
396                 queryDesc->dest = CreateDestReceiver(DestTuplestore, portal);
397
398                 /* Fetch the result set into the tuplestore */
399                 ExecutorRun(queryDesc, ForwardScanDirection, 0L);
400
401                 (*queryDesc->dest->rDestroy) (queryDesc->dest);
402                 queryDesc->dest = NULL;
403
404                 /*
405                  * Now shut down the inner executor.
406                  */
407                 portal->queryDesc = NULL;               /* prevent double shutdown */
408                 /* we do not need AfterTriggerEndQuery() here */
409                 ExecutorEnd(queryDesc);
410
411                 /*
412                  * Set the position in the result set: ideally, this could be
413                  * implemented by just skipping straight to the tuple # that we need
414                  * to be at, but the tuplestore API doesn't support that. So we start
415                  * at the beginning of the tuplestore and iterate through it until we
416                  * reach where we need to be.  FIXME someday?  (Fortunately, the
417                  * typical case is that we're supposed to be at or near the start
418                  * of the result set, so this isn't as bad as it sounds.)
419                  */
420                 MemoryContextSwitchTo(portal->holdContext);
421
422                 if (portal->atEnd)
423                 {
424                         /* we can handle this case even if posOverflow */
425                         while (tuplestore_advance(portal->holdStore, true))
426                                 /* continue */ ;
427                 }
428                 else
429                 {
430                         long            store_pos;
431
432                         if (portal->posOverflow)        /* oops, cannot trust portalPos */
433                                 ereport(ERROR,
434                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
435                                                  errmsg("could not reposition held cursor")));
436
437                         tuplestore_rescan(portal->holdStore);
438
439                         for (store_pos = 0; store_pos < portal->portalPos; store_pos++)
440                         {
441                                 if (!tuplestore_advance(portal->holdStore, true))
442                                         elog(ERROR, "unexpected end of tuple stream");
443                         }
444                 }
445         }
446         PG_CATCH();
447         {
448                 /* Uncaught error while executing portal: mark it dead */
449                 portal->status = PORTAL_FAILED;
450
451                 /* Restore global vars and propagate error */
452                 ActivePortal = saveActivePortal;
453                 ActiveSnapshot = saveActiveSnapshot;
454                 CurrentResourceOwner = saveResourceOwner;
455                 PortalContext = savePortalContext;
456
457                 PG_RE_THROW();
458         }
459         PG_END_TRY();
460
461         MemoryContextSwitchTo(oldcxt);
462
463         /* Mark portal not active */
464         portal->status = PORTAL_READY;
465
466         ActivePortal = saveActivePortal;
467         ActiveSnapshot = saveActiveSnapshot;
468         CurrentResourceOwner = saveResourceOwner;
469         PortalContext = savePortalContext;
470
471         /*
472          * We can now release any subsidiary memory of the portal's heap context;
473          * we'll never use it again.  The executor already dropped its context,
474          * but this will clean up anything that glommed onto the portal's heap via
475          * PortalContext.
476          */
477         MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
478 }