OSDN Git Service

8c6420ae4ccfab712bf16a600a54f6ba7c5f5d53
[pg-rex/syncrep.git] / src / backend / executor / spi.c
1 /*-------------------------------------------------------------------------
2  *
3  * spi.c
4  *                              Server Programming Interface
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/executor/spi.c,v 1.57 2001/08/02 18:08:43 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/printtup.h"
18 #include "commands/command.h"
19 #include "executor/spi_priv.h"
20
21
22 uint32          SPI_processed = 0;
23 Oid                     SPI_lastoid = InvalidOid;
24 SPITupleTable *SPI_tuptable = NULL;
25 int                     SPI_result;
26
27 static _SPI_connection *_SPI_stack = NULL;
28 static _SPI_connection *_SPI_current = NULL;
29 static int      _SPI_connected = -1;
30 static int      _SPI_curid = -1;
31
32 static int      _SPI_execute(char *src, int tcount, _SPI_plan *plan);
33 static int      _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount);
34
35 static int _SPI_execute_plan(_SPI_plan *plan,
36                                   Datum *Values, char *Nulls, int tcount);
37
38 static void _SPI_cursor_operation(Portal portal, bool forward, int count,
39                                         CommandDest dest);
40
41 static _SPI_plan *_SPI_copy_plan(_SPI_plan *plan, int location);
42
43 static int      _SPI_begin_call(bool execmem);
44 static int      _SPI_end_call(bool procmem);
45 static MemoryContext _SPI_execmem(void);
46 static MemoryContext _SPI_procmem(void);
47 static bool _SPI_checktuples(void);
48
49 #ifdef SPI_EXECUTOR_STATS
50 extern int      ShowExecutorStats;
51 extern void ResetUsage(void);
52 extern void ShowUsage(void);
53
54 #endif
55
56 /* =================== interface functions =================== */
57
58 int
59 SPI_connect(void)
60 {
61         _SPI_connection *new_SPI_stack;
62
63         /*
64          * When procedure called by Executor _SPI_curid expected to be equal
65          * to _SPI_connected
66          */
67         if (_SPI_curid != _SPI_connected)
68                 return SPI_ERROR_CONNECT;
69
70         if (_SPI_stack == NULL)
71         {
72                 if (_SPI_connected != -1)
73                         elog(FATAL, "SPI_connect: no connection(s) expected");
74                 new_SPI_stack = (_SPI_connection *) malloc(sizeof(_SPI_connection));
75         }
76         else
77         {
78                 if (_SPI_connected <= -1)
79                         elog(FATAL, "SPI_connect: some connection(s) expected");
80                 new_SPI_stack = (_SPI_connection *) realloc(_SPI_stack,
81                                                  (_SPI_connected + 2) * sizeof(_SPI_connection));
82         }
83
84         if (new_SPI_stack == NULL)
85                 elog(ERROR, "Memory exhausted in SPI_connect");
86
87         /*
88          * We' returning to procedure where _SPI_curid == _SPI_connected - 1
89          */
90         _SPI_stack = new_SPI_stack;
91         _SPI_connected++;
92
93         _SPI_current = &(_SPI_stack[_SPI_connected]);
94         _SPI_current->qtlist = NULL;
95         _SPI_current->processed = 0;
96         _SPI_current->tuptable = NULL;
97
98         /* Create memory contexts for this procedure */
99         _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext,
100                                                                                                   "SPI Proc",
101                                                                                                 ALLOCSET_DEFAULT_MINSIZE,
102                                                                                            ALLOCSET_DEFAULT_INITSIZE,
103                                                                                            ALLOCSET_DEFAULT_MAXSIZE);
104         _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext,
105                                                                                                   "SPI Exec",
106                                                                                                 ALLOCSET_DEFAULT_MINSIZE,
107                                                                                            ALLOCSET_DEFAULT_INITSIZE,
108                                                                                            ALLOCSET_DEFAULT_MAXSIZE);
109         /* ... and switch to procedure's context */
110         _SPI_current->savedcxt = MemoryContextSwitchTo(_SPI_current->procCxt);
111
112         _SPI_current->savedId = GetScanCommandId();
113         SetScanCommandId(GetCurrentCommandId());
114
115         return SPI_OK_CONNECT;
116
117 }
118
119 int
120 SPI_finish(void)
121 {
122         int                     res;
123
124         res = _SPI_begin_call(false);           /* live in procedure memory */
125         if (res < 0)
126                 return res;
127
128         /* Restore memory context as it was before procedure call */
129         MemoryContextSwitchTo(_SPI_current->savedcxt);
130
131         /* Release memory used in procedure call */
132         MemoryContextDelete(_SPI_current->execCxt);
133         MemoryContextDelete(_SPI_current->procCxt);
134
135         SetScanCommandId(_SPI_current->savedId);
136
137         /*
138          * After _SPI_begin_call _SPI_connected == _SPI_curid. Now we are
139          * closing connection to SPI and returning to upper Executor and so
140          * _SPI_connected must be equal to _SPI_curid.
141          */
142         _SPI_connected--;
143         _SPI_curid--;
144         if (_SPI_connected == -1)
145         {
146                 free(_SPI_stack);
147                 _SPI_stack = NULL;
148                 _SPI_current = NULL;
149         }
150         else
151         {
152                 _SPI_connection *new_SPI_stack;
153
154                 new_SPI_stack = (_SPI_connection *) realloc(_SPI_stack,
155                                                  (_SPI_connected + 1) * sizeof(_SPI_connection));
156                 /* This could only fail with a pretty stupid malloc package ... */
157                 if (new_SPI_stack == NULL)
158                         elog(ERROR, "Memory exhausted in SPI_finish");
159                 _SPI_stack = new_SPI_stack;
160                 _SPI_current = &(_SPI_stack[_SPI_connected]);
161         }
162
163         return SPI_OK_FINISH;
164
165 }
166
167 /*
168  * Clean up SPI state at transaction commit or abort (we don't care which).
169  */
170 void
171 AtEOXact_SPI(void)
172 {
173         /*
174          * Note that memory contexts belonging to SPI stack entries will be
175          * freed automatically, so we can ignore them here.  We just need to
176          * restore our static variables to initial state.
177          */
178         if (_SPI_stack != NULL)         /* there was abort */
179                 free(_SPI_stack);
180         _SPI_current = _SPI_stack = NULL;
181         _SPI_connected = _SPI_curid = -1;
182         SPI_processed = 0;
183         SPI_lastoid = InvalidOid;
184         SPI_tuptable = NULL;
185 }
186
187 void
188 SPI_push(void)
189 {
190         _SPI_curid++;
191 }
192
193 void
194 SPI_pop(void)
195 {
196         _SPI_curid--;
197 }
198
199 int
200 SPI_exec(char *src, int tcount)
201 {
202         int                     res;
203
204         if (src == NULL || tcount < 0)
205                 return SPI_ERROR_ARGUMENT;
206
207         res = _SPI_begin_call(true);
208         if (res < 0)
209                 return res;
210
211         res = _SPI_execute(src, tcount, NULL);
212
213         _SPI_end_call(true);
214         return res;
215 }
216
217 int
218 SPI_execp(void *plan, Datum *Values, char *Nulls, int tcount)
219 {
220         int                     res;
221
222         if (plan == NULL || tcount < 0)
223                 return SPI_ERROR_ARGUMENT;
224
225         if (((_SPI_plan *) plan)->nargs > 0 && Values == NULL)
226                 return SPI_ERROR_PARAM;
227
228         res = _SPI_begin_call(true);
229         if (res < 0)
230                 return res;
231
232         /* copy plan to current (executor) context */
233         plan = (void *) _SPI_copy_plan(plan, _SPI_CPLAN_CURCXT);
234
235         res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, tcount);
236
237         _SPI_end_call(true);
238         return res;
239 }
240
241 void *
242 SPI_prepare(char *src, int nargs, Oid *argtypes)
243 {
244         _SPI_plan  *plan;
245
246         if (src == NULL || nargs < 0 || (nargs > 0 && argtypes == NULL))
247         {
248                 SPI_result = SPI_ERROR_ARGUMENT;
249                 return NULL;
250         }
251
252         SPI_result = _SPI_begin_call(true);
253         if (SPI_result < 0)
254                 return NULL;
255
256         plan = (_SPI_plan *) palloc(sizeof(_SPI_plan));         /* Executor context */
257         plan->argtypes = argtypes;
258         plan->nargs = nargs;
259
260         SPI_result = _SPI_execute(src, 0, plan);
261
262         if (SPI_result >= 0)            /* copy plan to procedure context */
263                 plan = _SPI_copy_plan(plan, _SPI_CPLAN_PROCXT);
264         else
265                 plan = NULL;
266
267         _SPI_end_call(true);
268
269         return (void *) plan;
270
271 }
272
273 void *
274 SPI_saveplan(void *plan)
275 {
276         _SPI_plan  *newplan;
277
278         if (plan == NULL)
279         {
280                 SPI_result = SPI_ERROR_ARGUMENT;
281                 return NULL;
282         }
283
284         SPI_result = _SPI_begin_call(false);            /* don't change context */
285         if (SPI_result < 0)
286                 return NULL;
287
288         newplan = _SPI_copy_plan((_SPI_plan *) plan, _SPI_CPLAN_TOPCXT);
289
290         _SPI_curid--;
291         SPI_result = 0;
292
293         return (void *) newplan;
294
295 }
296
297 int
298 SPI_freeplan(void *plan)
299 {
300         _SPI_plan  *spiplan = (_SPI_plan *)plan;
301
302         if (plan == NULL)
303                 return SPI_ERROR_ARGUMENT;
304
305         MemoryContextDelete(spiplan->plancxt);
306         return 0;
307 }
308
309 HeapTuple
310 SPI_copytuple(HeapTuple tuple)
311 {
312         MemoryContext oldcxt = NULL;
313         HeapTuple       ctuple;
314
315         if (tuple == NULL)
316         {
317                 SPI_result = SPI_ERROR_ARGUMENT;
318                 return NULL;
319         }
320
321         if (_SPI_curid + 1 == _SPI_connected)           /* connected */
322         {
323                 if (_SPI_current != &(_SPI_stack[_SPI_curid + 1]))
324                         elog(FATAL, "SPI: stack corrupted");
325                 oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
326         }
327
328         ctuple = heap_copytuple(tuple);
329
330         if (oldcxt)
331                 MemoryContextSwitchTo(oldcxt);
332
333         return ctuple;
334 }
335
336 TupleDesc
337 SPI_copytupledesc(TupleDesc tupdesc)
338 {
339         MemoryContext oldcxt = NULL;
340         TupleDesc       ctupdesc;
341
342         if (tupdesc == NULL)
343         {
344                 SPI_result = SPI_ERROR_ARGUMENT;
345                 return NULL;
346         }
347
348         if (_SPI_curid + 1 == _SPI_connected)           /* connected */
349         {
350                 if (_SPI_current != &(_SPI_stack[_SPI_curid + 1]))
351                         elog(FATAL, "SPI: stack corrupted");
352                 oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
353         }
354
355         ctupdesc = CreateTupleDescCopy(tupdesc);
356
357         if (oldcxt)
358                 MemoryContextSwitchTo(oldcxt);
359
360         return ctupdesc;
361 }
362
363 HeapTuple
364 SPI_modifytuple(Relation rel, HeapTuple tuple, int natts, int *attnum,
365                                 Datum *Values, char *Nulls)
366 {
367         MemoryContext oldcxt = NULL;
368         HeapTuple       mtuple;
369         int                     numberOfAttributes;
370         uint8           infomask;
371         Datum      *v;
372         char       *n;
373         bool            isnull;
374         int                     i;
375
376         if (rel == NULL || tuple == NULL || natts <= 0 || attnum == NULL || Values == NULL)
377         {
378                 SPI_result = SPI_ERROR_ARGUMENT;
379                 return NULL;
380         }
381
382         if (_SPI_curid + 1 == _SPI_connected)           /* connected */
383         {
384                 if (_SPI_current != &(_SPI_stack[_SPI_curid + 1]))
385                         elog(FATAL, "SPI: stack corrupted");
386                 oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
387         }
388         SPI_result = 0;
389         numberOfAttributes = rel->rd_att->natts;
390         v = (Datum *) palloc(numberOfAttributes * sizeof(Datum));
391         n = (char *) palloc(numberOfAttributes * sizeof(char));
392
393         /* fetch old values and nulls */
394         for (i = 0; i < numberOfAttributes; i++)
395         {
396                 v[i] = heap_getattr(tuple, i + 1, rel->rd_att, &isnull);
397                 n[i] = (isnull) ? 'n' : ' ';
398         }
399
400         /* replace values and nulls */
401         for (i = 0; i < natts; i++)
402         {
403                 if (attnum[i] <= 0 || attnum[i] > numberOfAttributes)
404                         break;
405                 v[attnum[i] - 1] = Values[i];
406                 n[attnum[i] - 1] = (Nulls && Nulls[i] == 'n') ? 'n' : ' ';
407         }
408
409         if (i == natts)                         /* no errors in *attnum */
410         {
411                 mtuple = heap_formtuple(rel->rd_att, v, n);
412                 infomask = mtuple->t_data->t_infomask;
413                 memmove(&(mtuple->t_data->t_oid), &(tuple->t_data->t_oid),
414                                 ((char *) &(tuple->t_data->t_hoff) -
415                                  (char *) &(tuple->t_data->t_oid)));
416                 mtuple->t_data->t_infomask = infomask;
417                 mtuple->t_data->t_natts = numberOfAttributes;
418         }
419         else
420         {
421                 mtuple = NULL;
422                 SPI_result = SPI_ERROR_NOATTRIBUTE;
423         }
424
425         pfree(v);
426         pfree(n);
427
428         if (oldcxt)
429                 MemoryContextSwitchTo(oldcxt);
430
431         return mtuple;
432 }
433
434 int
435 SPI_fnumber(TupleDesc tupdesc, char *fname)
436 {
437         int                     res;
438
439         for (res = 0; res < tupdesc->natts; res++)
440         {
441                 if (strcasecmp(NameStr(tupdesc->attrs[res]->attname), fname) == 0)
442                         return res + 1;
443         }
444
445         return SPI_ERROR_NOATTRIBUTE;
446 }
447
448 char *
449 SPI_fname(TupleDesc tupdesc, int fnumber)
450 {
451
452         SPI_result = 0;
453         if (tupdesc->natts < fnumber || fnumber <= 0)
454         {
455                 SPI_result = SPI_ERROR_NOATTRIBUTE;
456                 return NULL;
457         }
458
459         return pstrdup(NameStr(tupdesc->attrs[fnumber - 1]->attname));
460 }
461
462 char *
463 SPI_getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber)
464 {
465         Datum           origval,
466                                 val,
467                                 result;
468         bool            isnull;
469         Oid                     foutoid,
470                                 typelem;
471         bool            typisvarlena;
472
473         SPI_result = 0;
474         if (tuple->t_data->t_natts < fnumber || fnumber <= 0)
475         {
476                 SPI_result = SPI_ERROR_NOATTRIBUTE;
477                 return NULL;
478         }
479
480         origval = heap_getattr(tuple, fnumber, tupdesc, &isnull);
481         if (isnull)
482                 return NULL;
483         if (!getTypeOutputInfo(tupdesc->attrs[fnumber - 1]->atttypid,
484                                                    &foutoid, &typelem, &typisvarlena))
485         {
486                 SPI_result = SPI_ERROR_NOOUTFUNC;
487                 return NULL;
488         }
489
490         /*
491          * If we have a toasted datum, forcibly detoast it here to avoid
492          * memory leakage inside the type's output routine.
493          */
494         if (typisvarlena)
495                 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
496         else
497                 val = origval;
498
499         result = OidFunctionCall3(foutoid,
500                                                           val,
501                                                           ObjectIdGetDatum(typelem),
502                                   Int32GetDatum(tupdesc->attrs[fnumber - 1]->atttypmod));
503
504         /* Clean up detoasted copy, if any */
505         if (val != origval)
506                 pfree(DatumGetPointer(val));
507
508         return DatumGetCString(result);
509 }
510
511 Datum
512 SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
513 {
514         Datum           val;
515
516         *isnull = true;
517         SPI_result = 0;
518         if (tuple->t_data->t_natts < fnumber || fnumber <= 0)
519         {
520                 SPI_result = SPI_ERROR_NOATTRIBUTE;
521                 return (Datum) NULL;
522         }
523
524         val = heap_getattr(tuple, fnumber, tupdesc, isnull);
525
526         return val;
527 }
528
529 char *
530 SPI_gettype(TupleDesc tupdesc, int fnumber)
531 {
532         HeapTuple       typeTuple;
533         char       *result;
534
535         SPI_result = 0;
536         if (tupdesc->natts < fnumber || fnumber <= 0)
537         {
538                 SPI_result = SPI_ERROR_NOATTRIBUTE;
539                 return NULL;
540         }
541
542         typeTuple = SearchSysCache(TYPEOID,
543                                  ObjectIdGetDatum(tupdesc->attrs[fnumber - 1]->atttypid),
544                                                            0, 0, 0);
545
546         if (!HeapTupleIsValid(typeTuple))
547         {
548                 SPI_result = SPI_ERROR_TYPUNKNOWN;
549                 return NULL;
550         }
551
552         result = pstrdup(NameStr(((Form_pg_type) GETSTRUCT(typeTuple))->typname));
553         ReleaseSysCache(typeTuple);
554         return result;
555 }
556
557 Oid
558 SPI_gettypeid(TupleDesc tupdesc, int fnumber)
559 {
560
561         SPI_result = 0;
562         if (tupdesc->natts < fnumber || fnumber <= 0)
563         {
564                 SPI_result = SPI_ERROR_NOATTRIBUTE;
565                 return InvalidOid;
566         }
567
568         return tupdesc->attrs[fnumber - 1]->atttypid;
569 }
570
571 char *
572 SPI_getrelname(Relation rel)
573 {
574         return pstrdup(RelationGetRelationName(rel));
575 }
576
577 void *
578 SPI_palloc(Size size)
579 {
580         MemoryContext oldcxt = NULL;
581         void       *pointer;
582
583         if (_SPI_curid + 1 == _SPI_connected)           /* connected */
584         {
585                 if (_SPI_current != &(_SPI_stack[_SPI_curid + 1]))
586                         elog(FATAL, "SPI: stack corrupted");
587                 oldcxt = MemoryContextSwitchTo(_SPI_current->savedcxt);
588         }
589
590         pointer = palloc(size);
591
592         if (oldcxt)
593                 MemoryContextSwitchTo(oldcxt);
594
595         return pointer;
596 }
597
598 void *
599 SPI_repalloc(void *pointer, Size size)
600 {
601         /* No longer need to worry which context chunk was in... */
602         return repalloc(pointer, size);
603 }
604
605 void
606 SPI_pfree(void *pointer)
607 {
608         /* No longer need to worry which context chunk was in... */
609         pfree(pointer);
610 }
611
612 void
613 SPI_freetuple(HeapTuple tuple)
614 {
615         /* No longer need to worry which context tuple was in... */
616         heap_freetuple(tuple);
617 }
618
619 void
620 SPI_freetuptable(SPITupleTable *tuptable)
621 {
622         if (tuptable != NULL)
623                 MemoryContextDelete(tuptable->tuptabcxt);
624 }
625
626
627
628 /*
629  * SPI_cursor_open()
630  *
631  *      Open a prepared SPI plan as a portal
632  */
633 Portal
634 SPI_cursor_open(char *name, void *plan, Datum *Values, char *Nulls)
635 {
636         static int                      unnamed_portal_count = 0;
637
638         _SPI_plan                  *spiplan = (_SPI_plan *)plan;
639         List                       *qtlist = spiplan->qtlist;
640         List                       *ptlist = spiplan->ptlist;
641         Query                      *queryTree;
642         Plan                       *planTree;
643         QueryDesc                  *queryDesc;
644         EState                     *eState;
645         TupleDesc                       attinfo;
646         MemoryContext           oldcontext;
647         Portal                          portal;
648         char                            portalname[64];
649         int                                     k;
650
651         /* Ensure that the plan contains only one regular SELECT query */
652         if (length(ptlist) != 1)
653                 elog(ERROR, "cannot open multi-query plan as cursor");
654         queryTree = (Query *)lfirst(qtlist);
655         planTree  = (Plan *)lfirst(ptlist);
656
657         if (queryTree->commandType != CMD_SELECT)
658                 elog(ERROR, "plan in SPI_cursor_open() is not a SELECT");
659         if (queryTree->isPortal)
660                 elog(ERROR, "plan in SPI_cursor_open() must NOT be a DECLARE already");
661         else if (queryTree->into != NULL)
662                 elog(ERROR, "plan in SPI_cursor_open() must NOT be a SELECT INTO");
663
664         /* Reset SPI result */
665         SPI_processed = 0;
666         SPI_tuptable = NULL;
667         _SPI_current->processed = 0;
668         _SPI_current->tuptable = NULL;
669
670         /* Make up a portal name if none given */
671         if (name == NULL)
672         {
673                 for (;;)
674                 {
675                     unnamed_portal_count++;
676                         if (unnamed_portal_count < 0)
677                                 unnamed_portal_count = 0;
678                         sprintf(portalname, "<unnamed cursor %d>", unnamed_portal_count);
679                         if (GetPortalByName(portalname) == NULL)
680                                 break;
681                 }
682
683                 name = portalname;
684         }
685
686         /* Ensure the portal doesn't exist already */
687         portal = GetPortalByName(name);
688         if (portal != NULL)
689                 elog(ERROR, "cursor \"%s\" already in use", name);
690
691         /* Create the portal */
692         portal = CreatePortal(name);
693         if (portal == NULL)
694                 elog(ERROR, "failed to create portal \"%s\"", name);
695
696         /* Switch to portals memory and copy the parsetree and plan to there */
697         oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
698         queryTree  = copyObject(queryTree);
699         planTree   = copyObject(planTree);
700
701         /* Modify the parsetree to be a cursor */
702         queryTree->isPortal = true;
703         queryTree->into     = pstrdup(name);
704         queryTree->isBinary = false;
705         
706         /* Create the QueryDesc object and the executor state */
707         queryDesc = CreateQueryDesc(queryTree, planTree, SPI);
708         eState    = CreateExecutorState();
709
710         /* If the plan has parameters, put them into the executor state */
711         if (spiplan->nargs > 0)
712         {
713                 ParamListInfo   paramLI = (ParamListInfo) palloc((spiplan->nargs + 1) *
714                                                                         sizeof(ParamListInfoData));
715                 eState->es_param_list_info = paramLI;
716                 for (k = 0; k < spiplan->nargs; paramLI++, k++)
717                 {
718                         paramLI->kind   = PARAM_NUM;
719                         paramLI->id             = k + 1;
720                         paramLI->isnull = (Nulls && Nulls[k] == 'n');
721                         paramLI->value  = Values[k];
722                 }
723                 paramLI->kind = PARAM_INVALID;
724         }
725         else
726                 eState->es_param_list_info = NULL;
727
728         /* Start the executor */
729         attinfo = ExecutorStart(queryDesc, eState);
730
731         /* Put all the objects into the portal */
732         PortalSetQuery(portal, queryDesc, attinfo, eState, PortalCleanup);
733
734         /* Switch back to the callers memory context */
735         MemoryContextSwitchTo(oldcontext);
736
737         /* Return the created portal */
738         return portal;
739 }
740
741
742 /*
743  * SPI_cursor_find()
744  *
745  *      Find the portal of an existing open cursor
746  */
747 Portal
748 SPI_cursor_find(char *name)
749 {
750         return GetPortalByName(name);
751 }
752
753
754 /*
755  * SPI_cursor_fetch()
756  *
757  *      Fetch rows in a cursor
758  */
759 void
760 SPI_cursor_fetch(Portal portal, bool forward, int count)
761 {
762         _SPI_cursor_operation(portal, forward, count, SPI);
763 }
764
765
766 /*
767  * SPI_cursor_move()
768  *
769  *      Move in a cursor
770  */
771 void
772 SPI_cursor_move(Portal portal, bool forward, int count)
773 {
774         _SPI_cursor_operation(portal, forward, count, None);
775 }
776
777
778 /*
779  * SPI_cursor_close()
780  *
781  *      Close a cursor
782  */
783 void
784 SPI_cursor_close(Portal portal)
785 {
786         Portal  my_portal = portal;
787
788         if (!PortalIsValid(my_portal))
789                 elog(ERROR, "invalid portal in SPI cursor operation");
790
791         PortalDrop(&my_portal);
792 }
793
794 /* =================== private functions =================== */
795
796 /*
797  * spi_printtup
798  *              store tuple retrieved by Executor into SPITupleTable
799  *              of current SPI procedure
800  *
801  */
802 void
803 spi_printtup(HeapTuple tuple, TupleDesc tupdesc, DestReceiver *self)
804 {
805         SPITupleTable *tuptable;
806         MemoryContext oldcxt;
807         MemoryContext tuptabcxt;
808
809         /*
810          * When called by Executor _SPI_curid expected to be equal to
811          * _SPI_connected
812          */
813         if (_SPI_curid != _SPI_connected || _SPI_connected < 0)
814                 elog(FATAL, "SPI: improper call to spi_printtup");
815         if (_SPI_current != &(_SPI_stack[_SPI_curid]))
816                 elog(FATAL, "SPI: stack corrupted in spi_printtup");
817
818         oldcxt = _SPI_procmem();        /* switch to procedure memory context */
819
820         tuptable = _SPI_current->tuptable;
821         if (tuptable == NULL)
822         {
823                 tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,
824                                                                                   "SPI TupTable",
825                                                                                   ALLOCSET_DEFAULT_MINSIZE,
826                                                                                   ALLOCSET_DEFAULT_INITSIZE,
827                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
828                 MemoryContextSwitchTo(tuptabcxt);
829
830                 _SPI_current->tuptable = tuptable = (SPITupleTable *)
831                         palloc(sizeof(SPITupleTable));
832                 tuptable->tuptabcxt = tuptabcxt;
833                 tuptable->alloced = tuptable->free = 128;
834                 tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));
835                 tuptable->tupdesc = CreateTupleDescCopy(tupdesc);
836         }
837         else 
838         {
839                 MemoryContextSwitchTo(tuptable->tuptabcxt);
840
841                 if (tuptable->free == 0)
842                 {
843                         tuptable->free = 256;
844                         tuptable->alloced += tuptable->free;
845                         tuptable->vals = (HeapTuple *) repalloc(tuptable->vals,
846                                                                           tuptable->alloced * sizeof(HeapTuple));
847                 }
848         }
849
850         tuptable->vals[tuptable->alloced - tuptable->free] = heap_copytuple(tuple);
851         (tuptable->free)--;
852
853         MemoryContextSwitchTo(oldcxt);
854         return;
855 }
856
857 /*
858  * Static functions
859  */
860
861 static int
862 _SPI_execute(char *src, int tcount, _SPI_plan *plan)
863 {
864         List       *queryTree_list;
865         List       *planTree_list;
866         List       *queryTree_list_item;
867         QueryDesc  *qdesc;
868         Query      *queryTree;
869         Plan       *planTree;
870         EState     *state;
871         int                     nargs = 0;
872         Oid                *argtypes = NULL;
873         int                     res = 0;
874         bool            islastquery;
875
876         /* Increment CommandCounter to see changes made by now */
877         CommandCounterIncrement();
878
879         SPI_processed = 0;
880         SPI_lastoid = InvalidOid;
881         SPI_tuptable = NULL;
882         _SPI_current->tuptable = NULL;
883         _SPI_current->qtlist = NULL;
884
885         if (plan)
886         {
887                 nargs = plan->nargs;
888                 argtypes = plan->argtypes;
889         }
890
891         queryTree_list = pg_parse_and_rewrite(src, argtypes, nargs);
892
893         _SPI_current->qtlist = queryTree_list;
894
895         planTree_list = NIL;
896
897         foreach(queryTree_list_item, queryTree_list)
898         {
899                 queryTree = (Query *) lfirst(queryTree_list_item);
900                 islastquery = (lnext(queryTree_list_item) == NIL);
901
902                 planTree = pg_plan_query(queryTree);
903                 planTree_list = lappend(planTree_list, planTree);
904
905                 if (queryTree->commandType == CMD_UTILITY)
906                 {
907                         if (nodeTag(queryTree->utilityStmt) == T_CopyStmt)
908                         {
909                                 CopyStmt   *stmt = (CopyStmt *) (queryTree->utilityStmt);
910
911                                 if (stmt->filename == NULL)
912                                         return SPI_ERROR_COPY;
913                         }
914                         else if (nodeTag(queryTree->utilityStmt) == T_ClosePortalStmt ||
915                                          nodeTag(queryTree->utilityStmt) == T_FetchStmt)
916                                 return SPI_ERROR_CURSOR;
917                         else if (nodeTag(queryTree->utilityStmt) == T_TransactionStmt)
918                                 return SPI_ERROR_TRANSACTION;
919                         res = SPI_OK_UTILITY;
920                         if (plan == NULL)
921                         {
922                                 ProcessUtility(queryTree->utilityStmt, None);
923                                 if (!islastquery)
924                                         CommandCounterIncrement();
925                                 else
926                                         return res;
927                         }
928                         else if (islastquery)
929                                 break;
930                 }
931                 else if (plan == NULL)
932                 {
933                         qdesc = CreateQueryDesc(queryTree, planTree,
934                                                                         islastquery ? SPI : None);
935                         state = CreateExecutorState();
936                         res = _SPI_pquery(qdesc, state, islastquery ? tcount : 0);
937                         if (res < 0 || islastquery)
938                                 return res;
939                         CommandCounterIncrement();
940                 }
941                 else
942                 {
943                         qdesc = CreateQueryDesc(queryTree, planTree,
944                                                                         islastquery ? SPI : None);
945                         res = _SPI_pquery(qdesc, NULL, islastquery ? tcount : 0);
946                         if (res < 0)
947                                 return res;
948                         if (islastquery)
949                                 break;
950                 }
951         }
952
953         if (plan)
954         {
955                 plan->qtlist = queryTree_list;
956                 plan->ptlist = planTree_list;
957         }
958
959         return res;
960 }
961
962 static int
963 _SPI_execute_plan(_SPI_plan *plan, Datum *Values, char *Nulls, int tcount)
964 {
965         List       *queryTree_list = plan->qtlist;
966         List       *planTree_list = plan->ptlist;
967         List       *queryTree_list_item;
968         QueryDesc  *qdesc;
969         Query      *queryTree;
970         Plan       *planTree;
971         EState     *state;
972         int                     nargs = plan->nargs;
973         int                     res = 0;
974         bool            islastquery;
975         int                     k;
976
977         /* Increment CommandCounter to see changes made by now */
978         CommandCounterIncrement();
979
980         SPI_processed = 0;
981         SPI_lastoid = InvalidOid;
982         SPI_tuptable = NULL;
983         _SPI_current->tuptable = NULL;
984         _SPI_current->qtlist = NULL;
985
986         foreach(queryTree_list_item, queryTree_list)
987         {
988                 queryTree = (Query *) lfirst(queryTree_list_item);
989                 planTree = lfirst(planTree_list);
990                 planTree_list = lnext(planTree_list);
991                 islastquery = (planTree_list == NIL);   /* assume lists are same
992                                                                                                  * len */
993
994                 if (queryTree->commandType == CMD_UTILITY)
995                 {
996                         ProcessUtility(queryTree->utilityStmt, None);
997                         if (!islastquery)
998                                 CommandCounterIncrement();
999                         else
1000                                 return SPI_OK_UTILITY;
1001                 }
1002                 else
1003                 {
1004                         qdesc = CreateQueryDesc(queryTree, planTree,
1005                                                                         islastquery ? SPI : None);
1006                         state = CreateExecutorState();
1007                         if (nargs > 0)
1008                         {
1009                                 ParamListInfo paramLI = (ParamListInfo) palloc((nargs + 1) *
1010                                                                                           sizeof(ParamListInfoData));
1011
1012                                 state->es_param_list_info = paramLI;
1013                                 for (k = 0; k < plan->nargs; paramLI++, k++)
1014                                 {
1015                                         paramLI->kind = PARAM_NUM;
1016                                         paramLI->id = k + 1;
1017                                         paramLI->isnull = (Nulls && Nulls[k] == 'n');
1018                                         paramLI->value = Values[k];
1019                                 }
1020                                 paramLI->kind = PARAM_INVALID;
1021                         }
1022                         else
1023                                 state->es_param_list_info = NULL;
1024                         res = _SPI_pquery(qdesc, state, islastquery ? tcount : 0);
1025                         if (res < 0 || islastquery)
1026                                 return res;
1027                         CommandCounterIncrement();
1028                 }
1029         }
1030
1031         return res;
1032 }
1033
1034 static int
1035 _SPI_pquery(QueryDesc *queryDesc, EState *state, int tcount)
1036 {
1037         Query      *parseTree = queryDesc->parsetree;
1038         int                     operation = queryDesc->operation;
1039         CommandDest dest = queryDesc->dest;
1040         TupleDesc       tupdesc;
1041         bool            isRetrieveIntoPortal = false;
1042         bool            isRetrieveIntoRelation = false;
1043         char       *intoName = NULL;
1044         int                     res;
1045         Oid                     save_lastoid;
1046
1047         switch (operation)
1048         {
1049                 case CMD_SELECT:
1050                         res = SPI_OK_SELECT;
1051                         if (parseTree->isPortal)
1052                         {
1053                                 isRetrieveIntoPortal = true;
1054                                 intoName = parseTree->into;
1055                                 parseTree->isBinary = false;    /* */
1056
1057                                 return SPI_ERROR_CURSOR;
1058
1059                         }
1060                         else if (parseTree->into != NULL)       /* select into table */
1061                         {
1062                                 res = SPI_OK_SELINTO;
1063                                 isRetrieveIntoRelation = true;
1064                                 queryDesc->dest = None; /* */
1065                         }
1066                         break;
1067                 case CMD_INSERT:
1068                         res = SPI_OK_INSERT;
1069                         break;
1070                 case CMD_DELETE:
1071                         res = SPI_OK_DELETE;
1072                         break;
1073                 case CMD_UPDATE:
1074                         res = SPI_OK_UPDATE;
1075                         break;
1076                 default:
1077                         return SPI_ERROR_OPUNKNOWN;
1078         }
1079
1080         if (state == NULL)                      /* plan preparation */
1081                 return res;
1082 #ifdef SPI_EXECUTOR_STATS
1083         if (ShowExecutorStats)
1084                 ResetUsage();
1085 #endif
1086         tupdesc = ExecutorStart(queryDesc, state);
1087
1088         /*
1089          * Don't work currently --- need to rearrange callers so that we
1090          * prepare the portal before doing CreateExecutorState() etc. See
1091          * pquery.c for the correct order of operations.
1092          */
1093         if (isRetrieveIntoPortal)
1094                 elog(FATAL, "SPI_select: retrieve into portal not implemented");
1095
1096         ExecutorRun(queryDesc, state, EXEC_FOR, (long) tcount);
1097
1098         _SPI_current->processed = state->es_processed;
1099         save_lastoid = state->es_lastoid;
1100
1101         if (operation == CMD_SELECT && queryDesc->dest == SPI)
1102         {
1103                 if (_SPI_checktuples())
1104                         elog(FATAL, "SPI_select: # of processed tuples check failed");
1105         }
1106
1107         ExecutorEnd(queryDesc, state);
1108
1109 #ifdef SPI_EXECUTOR_STATS
1110         if (ShowExecutorStats)
1111         {
1112                 fprintf(stderr, "! Executor Stats:\n");
1113                 ShowUsage();
1114         }
1115 #endif
1116
1117         if (dest == SPI)
1118         {
1119                 SPI_processed = _SPI_current->processed;
1120                 SPI_lastoid = save_lastoid;
1121                 SPI_tuptable = _SPI_current->tuptable;
1122         }
1123         queryDesc->dest = dest;
1124
1125         return res;
1126
1127 }
1128
1129 /*
1130  * _SPI_cursor_operation()
1131  *
1132  *      Do a FETCH or MOVE in a cursor
1133  */
1134 static void
1135 _SPI_cursor_operation(Portal portal, bool forward, int count,
1136                                         CommandDest dest)
1137 {
1138     QueryDesc      *querydesc;
1139         EState             *estate;
1140         MemoryContext   oldcontext;
1141         CommandDest             olddest;
1142
1143         /* Check that the portal is valid */
1144         if (!PortalIsValid(portal))
1145                 elog(ERROR, "invalid portal in SPI cursor operation");
1146
1147         /* Push the SPI stack */
1148         _SPI_begin_call(true);
1149
1150         /* Reset the SPI result */
1151         SPI_processed = 0;
1152         SPI_tuptable = NULL;
1153         _SPI_current->processed = 0;
1154         _SPI_current->tuptable = NULL;
1155
1156         /* Switch to the portals memory context */
1157         oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
1158         querydesc  = PortalGetQueryDesc(portal);
1159         estate     = PortalGetState(portal);
1160
1161         /* Save the queries command destination and set it to SPI (for fetch) */
1162         /* or None (for move) */
1163         olddest = querydesc->dest;
1164         querydesc->dest = dest;
1165
1166         /* Run the executor like PerformPortalFetch and remember states */
1167         if (forward)
1168         {
1169                 if (!portal->atEnd)
1170                 {
1171                         ExecutorRun(querydesc, estate, EXEC_FOR, (long)count);
1172                         _SPI_current->processed = estate->es_processed;
1173                         if (estate->es_processed > 0)
1174                                 portal->atStart = false;
1175                         if (count <= 0 || (int) estate->es_processed < count)
1176                                 portal->atEnd = true;
1177                 }
1178         }
1179         else
1180         {
1181                 if (!portal->atStart)
1182                 {
1183                         ExecutorRun(querydesc, estate, EXEC_BACK, (long) count);
1184                         _SPI_current->processed = estate->es_processed;
1185                         if (estate->es_processed > 0)
1186                                 portal->atEnd = false;
1187                         if (count <= 0 || estate->es_processed < count)
1188                                 portal->atStart = true;
1189                 }
1190         }
1191
1192         /* Restore the old command destination and switch back to callers */
1193         /* memory context */
1194         querydesc->dest = olddest;
1195         MemoryContextSwitchTo(oldcontext);
1196
1197         if (dest == SPI && _SPI_checktuples())
1198                 elog(FATAL, "SPI_fetch: # of processed tuples check failed");
1199
1200         /* Put the result into place for access by caller */
1201         SPI_processed = _SPI_current->processed;
1202         SPI_tuptable  = _SPI_current->tuptable;
1203
1204         /* Pop the SPI stack */
1205         _SPI_end_call(true);
1206 }
1207
1208
1209 static MemoryContext
1210 _SPI_execmem()
1211 {
1212         return MemoryContextSwitchTo(_SPI_current->execCxt);
1213 }
1214
1215 static MemoryContext
1216 _SPI_procmem()
1217 {
1218         return MemoryContextSwitchTo(_SPI_current->procCxt);
1219 }
1220
1221 /*
1222  * _SPI_begin_call
1223  *
1224  */
1225 static int
1226 _SPI_begin_call(bool execmem)
1227 {
1228         if (_SPI_curid + 1 != _SPI_connected)
1229                 return SPI_ERROR_UNCONNECTED;
1230         _SPI_curid++;
1231         if (_SPI_current != &(_SPI_stack[_SPI_curid]))
1232                 elog(FATAL, "SPI: stack corrupted");
1233
1234         if (execmem)                            /* switch to the Executor memory context */
1235                 _SPI_execmem();
1236
1237         return 0;
1238 }
1239
1240 static int
1241 _SPI_end_call(bool procmem)
1242 {
1243
1244         /*
1245          * We' returning to procedure where _SPI_curid == _SPI_connected - 1
1246          */
1247         _SPI_curid--;
1248
1249         _SPI_current->qtlist = NULL;
1250
1251         if (procmem)                            /* switch to the procedure memory context */
1252         {
1253                 _SPI_procmem();
1254                 /* and free Executor memory */
1255                 MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
1256         }
1257
1258         return 0;
1259 }
1260
1261 static bool
1262 _SPI_checktuples(void)
1263 {
1264         uint32          processed = _SPI_current->processed;
1265         SPITupleTable *tuptable = _SPI_current->tuptable;
1266         bool            failed = false;
1267
1268         if (processed == 0)
1269         {
1270                 if (tuptable != NULL)
1271                         failed = true;
1272         }
1273         else
1274         {
1275                 /* some tuples were processed */
1276                 if (tuptable == NULL)   /* spi_printtup was not called */
1277                         failed = true;
1278                 else if (processed != (tuptable->alloced - tuptable->free))
1279                         failed = true;
1280         }
1281
1282         return failed;
1283 }
1284
1285 static _SPI_plan *
1286 _SPI_copy_plan(_SPI_plan *plan, int location)
1287 {
1288         _SPI_plan  *newplan;
1289         MemoryContext oldcxt;
1290         MemoryContext plancxt;
1291         MemoryContext parentcxt;
1292
1293         /* Determine correct parent for the plan's memory context */
1294         if (location == _SPI_CPLAN_PROCXT)
1295                 parentcxt = _SPI_current->procCxt;
1296         else if (location == _SPI_CPLAN_TOPCXT)
1297                 parentcxt = TopMemoryContext;
1298         else
1299                 parentcxt = CurrentMemoryContext;
1300
1301         /*
1302          * Create a memory context for the plan.  We don't expect the plan to
1303          * be very large, so use smaller-than-default alloc parameters.
1304          */
1305         plancxt = AllocSetContextCreate(parentcxt,
1306                                                                         "SPI Plan",
1307                                                                         1024,
1308                                                                         1024,
1309                                                                         ALLOCSET_DEFAULT_MAXSIZE);
1310         oldcxt = MemoryContextSwitchTo(plancxt);
1311
1312         /* Copy the SPI plan into its own context */
1313         newplan = (_SPI_plan *) palloc(sizeof(_SPI_plan));
1314         newplan->plancxt = plancxt;
1315         newplan->qtlist = (List *) copyObject(plan->qtlist);
1316         newplan->ptlist = (List *) copyObject(plan->ptlist);
1317         newplan->nargs = plan->nargs;
1318         if (plan->nargs > 0)
1319         {
1320                 newplan->argtypes = (Oid *) palloc(plan->nargs * sizeof(Oid));
1321                 memcpy(newplan->argtypes, plan->argtypes, plan->nargs * sizeof(Oid));
1322         }
1323         else
1324                 newplan->argtypes = NULL;
1325
1326         MemoryContextSwitchTo(oldcxt);
1327
1328         return newplan;
1329 }