OSDN Git Service

a8356e5dcf359c828769042e99e630a4f91a3b56
[pg-rex/syncrep.git] / src / backend / commands / portalcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * portalcmds.c
4  *        Utility commands affecting portals (that is, SQL cursor commands)
5  *
6  * Note: see also tcop/pquery.c, which implements portal operations for
7  * the FE/BE protocol.  This module uses pquery.c for some operations.
8  * And both modules depend on utils/mmgr/portalmem.c, which controls
9  * storage management for portals (but doesn't run any queries in them).
10  *
11  *
12  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  *
16  * IDENTIFICATION
17  *        $PostgreSQL: pgsql/src/backend/commands/portalcmds.c,v 1.32 2004/08/29 04:12:30 momjian Exp $
18  *
19  *-------------------------------------------------------------------------
20  */
21
22 #include "postgres.h"
23
24 #include <limits.h>
25
26 #include "commands/portalcmds.h"
27 #include "executor/executor.h"
28 #include "optimizer/planner.h"
29 #include "rewrite/rewriteHandler.h"
30 #include "tcop/pquery.h"
31 #include "utils/memutils.h"
32
33
34 /*
35  * PerformCursorOpen
36  *              Execute SQL DECLARE CURSOR command.
37  */
38 void
39 PerformCursorOpen(DeclareCursorStmt *stmt, ParamListInfo params)
40 {
41         List       *rewritten;
42         Query      *query;
43         Plan       *plan;
44         Portal          portal;
45         MemoryContext oldContext;
46
47         /*
48          * Disallow empty-string cursor name (conflicts with protocol-level
49          * unnamed portal).
50          */
51         if (!stmt->portalname || stmt->portalname[0] == '\0')
52                 ereport(ERROR,
53                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
54                                  errmsg("invalid cursor name: must not be empty")));
55
56         /*
57          * If this is a non-holdable cursor, we require that this statement
58          * has been executed inside a transaction block (or else, it would
59          * have no user-visible effect).
60          */
61         if (!(stmt->options & CURSOR_OPT_HOLD))
62                 RequireTransactionChain((void *) stmt, "DECLARE CURSOR");
63
64         /*
65          * The query has been through parse analysis, but not rewriting or
66          * planning as yet.  Note that the grammar ensured we have a SELECT
67          * query, so we are not expecting rule rewriting to do anything
68          * strange.
69          */
70         rewritten = QueryRewrite((Query *) stmt->query);
71         if (list_length(rewritten) != 1 || !IsA(linitial(rewritten), Query))
72                 elog(ERROR, "unexpected rewrite result");
73         query = (Query *) linitial(rewritten);
74         if (query->commandType != CMD_SELECT)
75                 elog(ERROR, "unexpected rewrite result");
76
77         if (query->into)
78                 ereport(ERROR,
79                                 (errcode(ERRCODE_SYNTAX_ERROR),
80                                  errmsg("DECLARE CURSOR may not specify INTO")));
81         if (query->rowMarks != NIL)
82                 ereport(ERROR,
83                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
84                                  errmsg("DECLARE CURSOR ... FOR UPDATE is not supported"),
85                                  errdetail("Cursors must be READ ONLY.")));
86
87         plan = planner(query, true, stmt->options, NULL);
88
89         /*
90          * Create a portal and copy the query and plan into its memory
91          * context.
92          */
93         portal = CreatePortal(stmt->portalname, false, false);
94
95         oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
96
97         query = copyObject(query);
98         plan = copyObject(plan);
99
100         PortalDefineQuery(portal,
101                                           NULL,         /* unfortunately don't have sourceText */
102                                           "SELECT", /* cursor's query is always a SELECT */
103                                           list_make1(query),
104                                           list_make1(plan),
105                                           PortalGetHeapMemory(portal));
106
107         /*
108          * Also copy the outer portal's parameter list into the inner portal's
109          * memory context.  We want to pass down the parameter values in case
110          * we had a command like
111          *                      DECLARE c CURSOR FOR SELECT ... WHERE foo = $1
112          * This will have been parsed using the outer parameter set and the
113          * parameter value needs to be preserved for use when the cursor is
114          * executed.
115          */
116         params = copyParamList(params);
117
118         MemoryContextSwitchTo(oldContext);
119
120         /*
121          * Set up options for portal.
122          *
123          * If the user didn't specify a SCROLL type, allow or disallow scrolling
124          * based on whether it would require any additional runtime overhead
125          * to do so.
126          */
127         portal->cursorOptions = stmt->options;
128         if (!(portal->cursorOptions & (CURSOR_OPT_SCROLL | CURSOR_OPT_NO_SCROLL)))
129         {
130                 if (ExecSupportsBackwardScan(plan))
131                         portal->cursorOptions |= CURSOR_OPT_SCROLL;
132                 else
133                         portal->cursorOptions |= CURSOR_OPT_NO_SCROLL;
134         }
135
136         /*
137          * Start execution, inserting parameters if any.
138          */
139         PortalStart(portal, params);
140
141         Assert(portal->strategy == PORTAL_ONE_SELECT);
142
143         /*
144          * We're done; the query won't actually be run until
145          * PerformPortalFetch is called.
146          */
147 }
148
149 /*
150  * PerformPortalFetch
151  *              Execute SQL FETCH or MOVE command.
152  *
153  *      stmt: parsetree node for command
154  *      dest: where to send results
155  *      completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
156  *              in which to store a command completion status string.
157  *
158  * completionTag may be NULL if caller doesn't want a status string.
159  */
160 void
161 PerformPortalFetch(FetchStmt *stmt,
162                                    DestReceiver *dest,
163                                    char *completionTag)
164 {
165         Portal          portal;
166         long            nprocessed;
167
168         /*
169          * Disallow empty-string cursor name (conflicts with protocol-level
170          * unnamed portal).
171          */
172         if (!stmt->portalname || stmt->portalname[0] == '\0')
173                 ereport(ERROR,
174                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
175                                  errmsg("invalid cursor name: must not be empty")));
176
177         /* get the portal from the portal name */
178         portal = GetPortalByName(stmt->portalname);
179         if (!PortalIsValid(portal))
180         {
181                 ereport(ERROR,
182                                 (errcode(ERRCODE_UNDEFINED_CURSOR),
183                                  errmsg("cursor \"%s\" does not exist", stmt->portalname)));
184                 return; /* keep compiler happy */
185         }
186
187         /* Adjust dest if needed.  MOVE wants destination None */
188         if (stmt->ismove)
189                 dest = None_Receiver;
190
191         /* Do it */
192         nprocessed = PortalRunFetch(portal,
193                                                                 stmt->direction,
194                                                                 stmt->howMany,
195                                                                 dest);
196
197         /* Return command status if wanted */
198         if (completionTag)
199                 snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %ld",
200                                  stmt->ismove ? "MOVE" : "FETCH",
201                                  nprocessed);
202 }
203
204 /*
205  * PerformPortalClose
206  *              Close a cursor.
207  */
208 void
209 PerformPortalClose(const char *name)
210 {
211         Portal          portal;
212
213         /*
214          * Disallow empty-string cursor name (conflicts with protocol-level
215          * unnamed portal).
216          */
217         if (!name || name[0] == '\0')
218                 ereport(ERROR,
219                                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
220                                  errmsg("invalid cursor name: must not be empty")));
221
222         /*
223          * get the portal from the portal name
224          */
225         portal = GetPortalByName(name);
226         if (!PortalIsValid(portal))
227         {
228                 ereport(ERROR,
229                                 (errcode(ERRCODE_UNDEFINED_CURSOR),
230                                  errmsg("cursor \"%s\" does not exist", name)));
231                 return; /* keep compiler happy */
232         }
233
234         /*
235          * Note: PortalCleanup is called as a side-effect
236          */
237         PortalDrop(portal, false);
238 }
239
240 /*
241  * PortalCleanup
242  *
243  * Clean up a portal when it's dropped.  This is the standard cleanup hook
244  * for portals.
245  */
246 void
247 PortalCleanup(Portal portal)
248 {
249         QueryDesc  *queryDesc;
250
251         /*
252          * sanity checks
253          */
254         AssertArg(PortalIsValid(portal));
255         AssertArg(portal->cleanup == PortalCleanup);
256
257         /*
258          * Shut down executor, if still running.  We skip this during error
259          * abort, since other mechanisms will take care of releasing executor
260          * resources, and we can't be sure that ExecutorEnd itself wouldn't
261          * fail.
262          */
263         queryDesc = PortalGetQueryDesc(portal);
264         if (queryDesc)
265         {
266                 portal->queryDesc = NULL;
267                 if (portal->status != PORTAL_FAILED)
268                 {
269                         ResourceOwner saveResourceOwner;
270
271                         /* We must make the portal's resource owner current */
272                         saveResourceOwner = CurrentResourceOwner;
273                         PG_TRY();
274                         {
275                                 CurrentResourceOwner = portal->resowner;
276                                 ExecutorEnd(queryDesc);
277                         }
278                         PG_CATCH();
279                         {
280                                 /* Ensure CurrentResourceOwner is restored on error */
281                                 CurrentResourceOwner = saveResourceOwner;
282                                 PG_RE_THROW();
283                         }
284                         PG_END_TRY();
285                         CurrentResourceOwner = saveResourceOwner;
286                 }
287         }
288 }
289
290 /*
291  * PersistHoldablePortal
292  *
293  * Prepare the specified Portal for access outside of the current
294  * transaction. When this function returns, all future accesses to the
295  * portal must be done via the Tuplestore (not by invoking the
296  * executor).
297  */
298 void
299 PersistHoldablePortal(Portal portal)
300 {
301         QueryDesc  *queryDesc = PortalGetQueryDesc(portal);
302         Portal          saveActivePortal;
303         ResourceOwner saveResourceOwner;
304         MemoryContext savePortalContext;
305         MemoryContext saveQueryContext;
306         MemoryContext oldcxt;
307
308         /*
309          * If we're preserving a holdable portal, we had better be inside the
310          * transaction that originally created it.
311          */
312         Assert(portal->createXact == GetCurrentTransactionId());
313         Assert(queryDesc != NULL);
314
315         /*
316          * Caller must have created the tuplestore already.
317          */
318         Assert(portal->holdContext != NULL);
319         Assert(portal->holdStore != NULL);
320
321         /*
322          * Before closing down the executor, we must copy the tupdesc into
323          * long-term memory, since it was created in executor memory.
324          */
325         oldcxt = MemoryContextSwitchTo(portal->holdContext);
326
327         portal->tupDesc = CreateTupleDescCopy(portal->tupDesc);
328
329         MemoryContextSwitchTo(oldcxt);
330
331         /*
332          * Check for improper portal use, and mark portal active.
333          */
334         if (portal->status != PORTAL_READY)
335                 ereport(ERROR,
336                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
337                                  errmsg("portal \"%s\" cannot be run", portal->name)));
338         portal->status = PORTAL_ACTIVE;
339
340         /*
341          * Set up global portal context pointers.
342          */
343         saveActivePortal = ActivePortal;
344         saveResourceOwner = CurrentResourceOwner;
345         savePortalContext = PortalContext;
346         saveQueryContext = QueryContext;
347         PG_TRY();
348         {
349                 ActivePortal = portal;
350                 CurrentResourceOwner = portal->resowner;
351                 PortalContext = PortalGetHeapMemory(portal);
352                 QueryContext = portal->queryContext;
353
354                 MemoryContextSwitchTo(PortalContext);
355
356                 /*
357                  * Rewind the executor: we need to store the entire result set in the
358                  * tuplestore, so that subsequent backward FETCHs can be processed.
359                  */
360                 ExecutorRewind(queryDesc);
361
362                 /* Change the destination to output to the tuplestore */
363                 queryDesc->dest = CreateDestReceiver(Tuplestore, portal);
364
365                 /* Fetch the result set into the tuplestore */
366                 ExecutorRun(queryDesc, ForwardScanDirection, 0L);
367
368                 (*queryDesc->dest->rDestroy) (queryDesc->dest);
369                 queryDesc->dest = NULL;
370
371                 /*
372                  * Now shut down the inner executor.
373                  */
374                 portal->queryDesc = NULL;       /* prevent double shutdown */
375                 ExecutorEnd(queryDesc);
376
377                 /*
378                  * Reset the position in the result set: ideally, this could be
379                  * implemented by just skipping straight to the tuple # that we need
380                  * to be at, but the tuplestore API doesn't support that. So we start
381                  * at the beginning of the tuplestore and iterate through it until we
382                  * reach where we need to be.  FIXME someday?
383                  */
384                 MemoryContextSwitchTo(portal->holdContext);
385
386                 if (!portal->atEnd)
387                 {
388                         long            store_pos;
389
390                         if (portal->posOverflow)        /* oops, cannot trust portalPos */
391                                 ereport(ERROR,
392                                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
393                                                  errmsg("could not reposition held cursor")));
394
395                         tuplestore_rescan(portal->holdStore);
396
397                         for (store_pos = 0; store_pos < portal->portalPos; store_pos++)
398                         {
399                                 HeapTuple       tup;
400                                 bool            should_free;
401
402                                 tup = tuplestore_gettuple(portal->holdStore, true,
403                                                                                   &should_free);
404
405                                 if (tup == NULL)
406                                         elog(ERROR, "unexpected end of tuple stream");
407
408                                 if (should_free)
409                                         pfree(tup);
410                         }
411                 }
412         }
413         PG_CATCH();
414         {
415                 /* Uncaught error while executing portal: mark it dead */
416                 portal->status = PORTAL_FAILED;
417
418                 /* Restore global vars and propagate error */
419                 ActivePortal = saveActivePortal;
420                 CurrentResourceOwner = saveResourceOwner;
421                 PortalContext = savePortalContext;
422                 QueryContext = saveQueryContext;
423
424                 PG_RE_THROW();
425         }
426         PG_END_TRY();
427
428         MemoryContextSwitchTo(oldcxt);
429
430         /* Mark portal not active */
431         portal->status = PORTAL_READY;
432
433         ActivePortal = saveActivePortal;
434         CurrentResourceOwner = saveResourceOwner;
435         PortalContext = savePortalContext;
436         QueryContext = saveQueryContext;
437
438         /*
439          * We can now release any subsidiary memory of the portal's heap
440          * context; we'll never use it again.  The executor already dropped
441          * its context, but this will clean up anything that glommed onto the
442          * portal's heap via PortalContext.
443          */
444         MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
445 }