OSDN Git Service

7f67f37b00cfadd4a37b6969a2f24407c99c6a0d
[pg-rex/syncrep.git] / contrib / tablefunc / tablefunc.c
1 /*
2  * tablefunc
3  *
4  * Sample to demonstrate C functions which return setof scalar
5  * and setof composite.
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Nabil Sayegh <postgresql@e-trolley.de>
9  *
10  * Copyright (c) 2002-2005, PostgreSQL Global Development Group
11  *
12  * Permission to use, copy, modify, and distribute this software and its
13  * documentation for any purpose, without fee, and without a written agreement
14  * is hereby granted, provided that the above copyright notice and this
15  * paragraph and the following two paragraphs appear in all copies.
16  *
17  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
18  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
19  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
20  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
21  * POSSIBILITY OF SUCH DAMAGE.
22  *
23  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
24  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
25  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
26  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
27  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
28  *
29  */
30 #include "postgres.h"
31
32 #include <math.h>
33
34 #include "fmgr.h"
35 #include "funcapi.h"
36 #include "executor/spi.h"
37 #include "lib/stringinfo.h"
38 #include "miscadmin.h"
39 #include "utils/builtins.h"
40 #include "utils/guc.h"
41 #include "utils/lsyscache.h"
42
43 #include "tablefunc.h"
44
45 static int      load_categories_hash(char *cats_sql, MemoryContext per_query_ctx);
46 static Tuplestorestate *get_crosstab_tuplestore(char *sql,
47                                                 int num_categories,
48                                                 TupleDesc tupdesc,
49                                                 MemoryContext per_query_ctx);
50 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial);
51 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
52 static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
53 static void get_normal_pair(float8 *x1, float8 *x2);
54 static Tuplestorestate *connectby(char *relname,
55                   char *key_fld,
56                   char *parent_key_fld,
57                   char *orderby_fld,
58                   char *branch_delim,
59                   char *start_with,
60                   int max_depth,
61                   bool show_branch,
62                   bool show_serial,
63                   MemoryContext per_query_ctx,
64                   AttInMetadata *attinmeta);
65 static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
66                                                          char *parent_key_fld,
67                                                          char *relname,
68                                                          char *orderby_fld,
69                                                          char *branch_delim,
70                                                          char *start_with,
71                                                          char *branch,
72                                                          int level,
73                                                          int *serial,
74                                                          int max_depth,
75                                                          bool show_branch,
76                                                          bool show_serial,
77                                                          MemoryContext per_query_ctx,
78                                                          AttInMetadata *attinmeta,
79                                                          Tuplestorestate *tupstore);
80 static char *quote_literal_cstr(char *rawstr);
81
82 typedef struct
83 {
84         float8          mean;                   /* mean of the distribution */
85         float8          stddev;                 /* stddev of the distribution */
86         float8          carry_val;              /* hold second generated value */
87         bool            use_carry;              /* use second generated value */
88 }       normal_rand_fctx;
89
90 typedef struct
91 {
92         SPITupleTable *spi_tuptable;    /* sql results from user query */
93         char       *lastrowid;          /* rowid of the last tuple sent */
94 }       crosstab_fctx;
95
96 #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
97 #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
98 #define xpfree(var_) \
99         do { \
100                 if (var_ != NULL) \
101                 { \
102                         pfree(var_); \
103                         var_ = NULL; \
104                 } \
105         } while (0)
106
107 /* sign, 10 digits, '\0' */
108 #define INT32_STRLEN    12
109
110 /* hash table support */
111 static HTAB *crosstab_HashTable;
112
113 /* The information we cache about loaded procedures */
114 typedef struct crosstab_cat_desc
115 {
116         char       *catname;
117         int                     attidx;                 /* zero based */
118 }       crosstab_cat_desc;
119
120 #define MAX_CATNAME_LEN                 NAMEDATALEN
121 #define INIT_CATS                               64
122
123 #define crosstab_HashTableLookup(CATNAME, CATDESC) \
124 do { \
125         crosstab_HashEnt *hentry; char key[MAX_CATNAME_LEN]; \
126         \
127         MemSet(key, 0, MAX_CATNAME_LEN); \
128         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATNAME); \
129         hentry = (crosstab_HashEnt*) hash_search(crosstab_HashTable, \
130                                                                                  key, HASH_FIND, NULL); \
131         if (hentry) \
132                 CATDESC = hentry->catdesc; \
133         else \
134                 CATDESC = NULL; \
135 } while(0)
136
137 #define crosstab_HashTableInsert(CATDESC) \
138 do { \
139         crosstab_HashEnt *hentry; bool found; char key[MAX_CATNAME_LEN]; \
140         \
141         MemSet(key, 0, MAX_CATNAME_LEN); \
142         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATDESC->catname); \
143         hentry = (crosstab_HashEnt*) hash_search(crosstab_HashTable, \
144                                                                                  key, HASH_ENTER, &found); \
145         if (found) \
146                 ereport(ERROR, \
147                                 (errcode(ERRCODE_DUPLICATE_OBJECT), \
148                                  errmsg("duplicate category name"))); \
149         hentry->catdesc = CATDESC; \
150 } while(0)
151
152 /* hash table */
153 typedef struct crosstab_hashent
154 {
155         char            internal_catname[MAX_CATNAME_LEN];
156         crosstab_cat_desc *catdesc;
157 }       crosstab_HashEnt;
158
159 /*
160  * normal_rand - return requested number of random values
161  * with a Gaussian (Normal) distribution.
162  *
163  * inputs are int numvals, float8 mean, and float8 stddev
164  * returns setof float8
165  */
166 PG_FUNCTION_INFO_V1(normal_rand);
167 Datum
168 normal_rand(PG_FUNCTION_ARGS)
169 {
170         FuncCallContext *funcctx;
171         int                     call_cntr;
172         int                     max_calls;
173         normal_rand_fctx *fctx;
174         float8          mean;
175         float8          stddev;
176         float8          carry_val;
177         bool            use_carry;
178         MemoryContext oldcontext;
179
180         /* stuff done only on the first call of the function */
181         if (SRF_IS_FIRSTCALL())
182         {
183                 /* create a function context for cross-call persistence */
184                 funcctx = SRF_FIRSTCALL_INIT();
185
186                 /*
187                  * switch to memory context appropriate for multiple function calls
188                  */
189                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
190
191                 /* total number of tuples to be returned */
192                 funcctx->max_calls = PG_GETARG_UINT32(0);
193
194                 /* allocate memory for user context */
195                 fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
196
197                 /*
198                  * Use fctx to keep track of upper and lower bounds from call to call.
199                  * It will also be used to carry over the spare value we get from the
200                  * Box-Muller algorithm so that we only actually calculate a new value
201                  * every other call.
202                  */
203                 fctx->mean = PG_GETARG_FLOAT8(1);
204                 fctx->stddev = PG_GETARG_FLOAT8(2);
205                 fctx->carry_val = 0;
206                 fctx->use_carry = false;
207
208                 funcctx->user_fctx = fctx;
209
210                 MemoryContextSwitchTo(oldcontext);
211         }
212
213         /* stuff done on every call of the function */
214         funcctx = SRF_PERCALL_SETUP();
215
216         call_cntr = funcctx->call_cntr;
217         max_calls = funcctx->max_calls;
218         fctx = funcctx->user_fctx;
219         mean = fctx->mean;
220         stddev = fctx->stddev;
221         carry_val = fctx->carry_val;
222         use_carry = fctx->use_carry;
223
224         if (call_cntr < max_calls)      /* do when there is more left to send */
225         {
226                 float8          result;
227
228                 if (use_carry)
229                 {
230                         /*
231                          * reset use_carry and use second value obtained on last pass
232                          */
233                         fctx->use_carry = false;
234                         result = carry_val;
235                 }
236                 else
237                 {
238                         float8          normval_1;
239                         float8          normval_2;
240
241                         /* Get the next two normal values */
242                         get_normal_pair(&normval_1, &normval_2);
243
244                         /* use the first */
245                         result = mean + (stddev * normval_1);
246
247                         /* and save the second */
248                         fctx->carry_val = mean + (stddev * normval_2);
249                         fctx->use_carry = true;
250                 }
251
252                 /* send the result */
253                 SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
254         }
255         else
256                 /* do when there is no more left */
257                 SRF_RETURN_DONE(funcctx);
258 }
259
260 /*
261  * get_normal_pair()
262  * Assigns normally distributed (Gaussian) values to a pair of provided
263  * parameters, with mean 0, standard deviation 1.
264  *
265  * This routine implements Algorithm P (Polar method for normal deviates)
266  * from Knuth's _The_Art_of_Computer_Programming_, Volume 2, 3rd ed., pages
267  * 122-126. Knuth cites his source as "The polar method", G. E. P. Box, M. E.
268  * Muller, and G. Marsaglia, _Annals_Math,_Stat._ 29 (1958), 610-611.
269  *
270  */
271 static void
272 get_normal_pair(float8 *x1, float8 *x2)
273 {
274         float8          u1,
275                                 u2,
276                                 v1,
277                                 v2,
278                                 s;
279
280         do
281         {
282                 u1 = (float8) random() / (float8) MAX_RANDOM_VALUE;
283                 u2 = (float8) random() / (float8) MAX_RANDOM_VALUE;
284
285                 v1 = (2.0 * u1) - 1.0;
286                 v2 = (2.0 * u2) - 1.0;
287
288                 s = v1 * v1 + v2 * v2;
289         } while (s >= 1.0);
290
291         if (s == 0)
292         {
293                 *x1 = 0;
294                 *x2 = 0;
295         }
296         else
297         {
298                 s = sqrt((-2.0 * log(s)) / s);
299                 *x1 = v1 * s;
300                 *x2 = v2 * s;
301         }
302 }
303
304 /*
305  * crosstab - create a crosstab of rowids and values columns from a
306  * SQL statement returning one rowid column, one category column,
307  * and one value column.
308  *
309  * e.g. given sql which produces:
310  *
311  *                      rowid   cat             value
312  *                      ------+-------+-------
313  *                      row1    cat1    val1
314  *                      row1    cat2    val2
315  *                      row1    cat3    val3
316  *                      row1    cat4    val4
317  *                      row2    cat1    val5
318  *                      row2    cat2    val6
319  *                      row2    cat3    val7
320  *                      row2    cat4    val8
321  *
322  * crosstab returns:
323  *                                      <===== values columns =====>
324  *                      rowid   cat1    cat2    cat3    cat4
325  *                      ------+-------+-------+-------+-------
326  *                      row1    val1    val2    val3    val4
327  *                      row2    val5    val6    val7    val8
328  *
329  * NOTES:
330  * 1. SQL result must be ordered by 1,2.
331  * 2. The number of values columns depends on the tuple description
332  *        of the function's declared return type.  The return type's columns
333  *        must match the datatypes of the SQL query's result.  The datatype
334  *        of the category column can be anything, however.
335  * 3. Missing values (i.e. not enough adjacent rows of same rowid to
336  *        fill the number of result values columns) are filled in with nulls.
337  * 4. Extra values (i.e. too many adjacent rows of same rowid to fill
338  *        the number of result values columns) are skipped.
339  * 5. Rows with all nulls in the values columns are skipped.
340  */
341 PG_FUNCTION_INFO_V1(crosstab);
342 Datum
343 crosstab(PG_FUNCTION_ARGS)
344 {
345         FuncCallContext *funcctx;
346         TupleDesc       ret_tupdesc;
347         int                     call_cntr;
348         int                     max_calls;
349         AttInMetadata *attinmeta;
350         SPITupleTable *spi_tuptable = NULL;
351         TupleDesc       spi_tupdesc;
352         char       *lastrowid = NULL;
353         crosstab_fctx *fctx;
354         int                     i;
355         int                     num_categories;
356         MemoryContext oldcontext;
357
358         /* stuff done only on the first call of the function */
359         if (SRF_IS_FIRSTCALL())
360         {
361                 char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
362                 TupleDesc       tupdesc;
363                 int                     ret;
364                 int                     proc;
365
366                 /* create a function context for cross-call persistence */
367                 funcctx = SRF_FIRSTCALL_INIT();
368
369                 /*
370                  * switch to memory context appropriate for multiple function calls
371                  */
372                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
373
374                 /* Connect to SPI manager */
375                 if ((ret = SPI_connect()) < 0)
376                         /* internal error */
377                         elog(ERROR, "crosstab: SPI_connect returned %d", ret);
378
379                 /* Retrieve the desired rows */
380                 ret = SPI_execute(sql, true, 0);
381                 proc = SPI_processed;
382
383                 /* Check for qualifying tuples */
384                 if ((ret == SPI_OK_SELECT) && (proc > 0))
385                 {
386                         spi_tuptable = SPI_tuptable;
387                         spi_tupdesc = spi_tuptable->tupdesc;
388
389                         /*----------
390                          * The provided SQL query must always return three columns.
391                          *
392                          * 1. rowname
393                          *      the label or identifier for each row in the final result
394                          * 2. category
395                          *      the label or identifier for each column in the final result
396                          * 3. values
397                          *      the value for each column in the final result
398                          *----------
399                          */
400                         if (spi_tupdesc->natts != 3)
401                                 ereport(ERROR,
402                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
403                                                  errmsg("invalid source data SQL statement"),
404                                                  errdetail("The provided SQL must return 3 "
405                                                                    "columns: rowid, category, and values.")));
406                 }
407                 else
408                 {
409                         /* no qualifying tuples */
410                         SPI_finish();
411                         SRF_RETURN_DONE(funcctx);
412                 }
413
414                 /* SPI switches context on us, so reset it */
415                 MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
416
417                 /* get a tuple descriptor for our result type */
418                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
419                 {
420                         case TYPEFUNC_COMPOSITE:
421                                 /* success */
422                                 break;
423                         case TYPEFUNC_RECORD:
424                                 /* failed to determine actual type of RECORD */
425                                 ereport(ERROR,
426                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
427                                                  errmsg("function returning record called in context "
428                                                                 "that cannot accept type record")));
429                                 break;
430                         default:
431                                 /* result type isn't composite */
432                                 elog(ERROR, "return type must be a row type");
433                                 break;
434                 }
435
436                 /* make sure we have a persistent copy of the tupdesc */
437                 tupdesc = CreateTupleDescCopy(tupdesc);
438
439                 /*
440                  * Check that return tupdesc is compatible with the data we got from
441                  * SPI, at least based on number and type of attributes
442                  */
443                 if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
444                         ereport(ERROR,
445                                         (errcode(ERRCODE_SYNTAX_ERROR),
446                                          errmsg("return and sql tuple descriptions are " \
447                                                         "incompatible")));
448
449                 /*
450                  * Generate attribute metadata needed later to produce tuples from raw
451                  * C strings
452                  */
453                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
454                 funcctx->attinmeta = attinmeta;
455
456                 /* allocate memory for user context */
457                 fctx = (crosstab_fctx *) palloc(sizeof(crosstab_fctx));
458
459                 /*
460                  * Save spi data for use across calls
461                  */
462                 fctx->spi_tuptable = spi_tuptable;
463                 fctx->lastrowid = NULL;
464                 funcctx->user_fctx = fctx;
465
466                 /* total number of tuples to be returned */
467                 funcctx->max_calls = proc;
468
469                 MemoryContextSwitchTo(oldcontext);
470         }
471
472         /* stuff done on every call of the function */
473         funcctx = SRF_PERCALL_SETUP();
474
475         /*
476          * initialize per-call variables
477          */
478         call_cntr = funcctx->call_cntr;
479         max_calls = funcctx->max_calls;
480
481         /* user context info */
482         fctx = (crosstab_fctx *) funcctx->user_fctx;
483         lastrowid = fctx->lastrowid;
484         spi_tuptable = fctx->spi_tuptable;
485
486         /* the sql tuple */
487         spi_tupdesc = spi_tuptable->tupdesc;
488
489         /* attribute return type and return tuple description */
490         attinmeta = funcctx->attinmeta;
491         ret_tupdesc = attinmeta->tupdesc;
492
493         /* the return tuple always must have 1 rowid + num_categories columns */
494         num_categories = ret_tupdesc->natts - 1;
495
496         if (call_cntr < max_calls)      /* do when there is more left to send */
497         {
498                 HeapTuple       tuple;
499                 Datum           result;
500                 char      **values;
501                 bool            allnulls = true;
502
503                 while (true)
504                 {
505                         /* allocate space */
506                         values = (char **) palloc((1 + num_categories) * sizeof(char *));
507
508                         /* and make sure it's clear */
509                         memset(values, '\0', (1 + num_categories) * sizeof(char *));
510
511                         /*
512                          * now loop through the sql results and assign each value in
513                          * sequence to the next category
514                          */
515                         for (i = 0; i < num_categories; i++)
516                         {
517                                 HeapTuple       spi_tuple;
518                                 char       *rowid = NULL;
519
520                                 /* see if we've gone too far already */
521                                 if (call_cntr >= max_calls)
522                                         break;
523
524                                 /* get the next sql result tuple */
525                                 spi_tuple = spi_tuptable->vals[call_cntr];
526
527                                 /* get the rowid from the current sql result tuple */
528                                 rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
529
530                                 /*
531                                  * If this is the first pass through the values for this rowid
532                                  * set it, otherwise make sure it hasn't changed on us. Also
533                                  * check to see if the rowid is the same as that of the last
534                                  * tuple sent -- if so, skip this tuple entirely
535                                  */
536                                 if (i == 0)
537                                         values[0] = pstrdup(rowid);
538
539                                 if ((rowid != NULL) && (strcmp(rowid, values[0]) == 0))
540                                 {
541                                         if ((lastrowid != NULL) && (strcmp(rowid, lastrowid) == 0))
542                                                 break;
543                                         else if (allnulls == true)
544                                                 allnulls = false;
545
546                                         /*
547                                          * Get the next category item value, which is alway
548                                          * attribute number three.
549                                          *
550                                          * Be careful to sssign the value to the array index based on
551                                          * which category we are presently processing.
552                                          */
553                                         values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
554
555                                         /*
556                                          * increment the counter since we consume a row for each
557                                          * category, but not for last pass because the API will do
558                                          * that for us
559                                          */
560                                         if (i < (num_categories - 1))
561                                                 call_cntr = ++funcctx->call_cntr;
562                                 }
563                                 else
564                                 {
565                                         /*
566                                          * We'll fill in NULLs for the missing values, but we need
567                                          * to decrement the counter since this sql result row
568                                          * doesn't belong to the current output tuple.
569                                          */
570                                         call_cntr = --funcctx->call_cntr;
571                                         break;
572                                 }
573
574                                 if (rowid != NULL)
575                                         xpfree(rowid);
576                         }
577
578                         xpfree(fctx->lastrowid);
579
580                         if (values[0] != NULL)
581                         {
582                                 /*
583                                  * switch to memory context appropriate for multiple function
584                                  * calls
585                                  */
586                                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
587
588                                 lastrowid = fctx->lastrowid = pstrdup(values[0]);
589                                 MemoryContextSwitchTo(oldcontext);
590                         }
591
592                         if (!allnulls)
593                         {
594                                 /* build the tuple */
595                                 tuple = BuildTupleFromCStrings(attinmeta, values);
596
597                                 /* make the tuple into a datum */
598                                 result = HeapTupleGetDatum(tuple);
599
600                                 /* Clean up */
601                                 for (i = 0; i < num_categories + 1; i++)
602                                         if (values[i] != NULL)
603                                                 xpfree(values[i]);
604                                 xpfree(values);
605
606                                 SRF_RETURN_NEXT(funcctx, result);
607                         }
608                         else
609                         {
610                                 /*
611                                  * Skipping this tuple entirely, but we need to advance the
612                                  * counter like the API would if we had returned one.
613                                  */
614                                 call_cntr = ++funcctx->call_cntr;
615
616                                 /* we'll start over at the top */
617                                 xpfree(values);
618
619                                 /* see if we've gone too far already */
620                                 if (call_cntr >= max_calls)
621                                 {
622                                         /* release SPI related resources */
623                                         SPI_finish();
624                                         SRF_RETURN_DONE(funcctx);
625                                 }
626                         }
627                 }
628         }
629         else
630                 /* do when there is no more left */
631         {
632                 /* release SPI related resources */
633                 SPI_finish();
634                 SRF_RETURN_DONE(funcctx);
635         }
636 }
637
638 /*
639  * crosstab_hash - reimplement crosstab as materialized function and
640  * properly deal with missing values (i.e. don't pack remaining
641  * values to the left)
642  *
643  * crosstab - create a crosstab of rowids and values columns from a
644  * SQL statement returning one rowid column, one category column,
645  * and one value column.
646  *
647  * e.g. given sql which produces:
648  *
649  *                      rowid   cat             value
650  *                      ------+-------+-------
651  *                      row1    cat1    val1
652  *                      row1    cat2    val2
653  *                      row1    cat4    val4
654  *                      row2    cat1    val5
655  *                      row2    cat2    val6
656  *                      row2    cat3    val7
657  *                      row2    cat4    val8
658  *
659  * crosstab returns:
660  *                                      <===== values columns =====>
661  *                      rowid   cat1    cat2    cat3    cat4
662  *                      ------+-------+-------+-------+-------
663  *                      row1    val1    val2    null    val4
664  *                      row2    val5    val6    val7    val8
665  *
666  * NOTES:
667  * 1. SQL result must be ordered by 1.
668  * 2. The number of values columns depends on the tuple description
669  *        of the function's declared return type.
670  * 3. Missing values (i.e. missing category) are filled in with nulls.
671  * 4. Extra values (i.e. not in category results) are skipped.
672  */
673 PG_FUNCTION_INFO_V1(crosstab_hash);
674 Datum
675 crosstab_hash(PG_FUNCTION_ARGS)
676 {
677         char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
678         char       *cats_sql = GET_STR(PG_GETARG_TEXT_P(1));
679         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
680         TupleDesc       tupdesc;
681         MemoryContext per_query_ctx;
682         MemoryContext oldcontext;
683         int                     num_categories;
684
685         /* check to see if caller supports us returning a tuplestore */
686         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
687                 ereport(ERROR,
688                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
689                                  errmsg("set-valued function called in context that cannot accept a set")));
690         if (!(rsinfo->allowedModes & SFRM_Materialize))
691                 ereport(ERROR,
692                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
693                                  errmsg("materialize mode required, but it is not " \
694                                                 "allowed in this context")));
695
696         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
697         oldcontext = MemoryContextSwitchTo(per_query_ctx);
698
699         /* get the requested return tuple description */
700         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
701
702         /*
703          * Check to make sure we have a reasonable tuple descriptor
704          *
705          * Note we will attempt to coerce the values into whatever the return
706          * attribute type is and depend on the "in" function to complain if
707          * needed.
708          */
709         if (tupdesc->natts < 2)
710                 ereport(ERROR,
711                                 (errcode(ERRCODE_SYNTAX_ERROR),
712                                  errmsg("query-specified return tuple and " \
713                                                 "crosstab function are not compatible")));
714
715         /* load up the categories hash table */
716         num_categories = load_categories_hash(cats_sql, per_query_ctx);
717
718         /* let the caller know we're sending back a tuplestore */
719         rsinfo->returnMode = SFRM_Materialize;
720
721         /* now go build it */
722         rsinfo->setResult = get_crosstab_tuplestore(sql,
723                                                                                                 num_categories,
724                                                                                                 tupdesc,
725                                                                                                 per_query_ctx);
726
727         /*
728          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
729          * tuples are in our tuplestore and passed back through rsinfo->setResult.
730          * rsinfo->setDesc is set to the tuple description that we actually used
731          * to build our tuples with, so the caller can verify we did what it was
732          * expecting.
733          */
734         rsinfo->setDesc = tupdesc;
735         MemoryContextSwitchTo(oldcontext);
736
737         return (Datum) 0;
738 }
739
740 /*
741  * load up the categories hash table
742  */
743 static int
744 load_categories_hash(char *cats_sql, MemoryContext per_query_ctx)
745 {
746         HASHCTL         ctl;
747         int                     ret;
748         int                     proc;
749         MemoryContext SPIcontext;
750         int                     num_categories = 0;
751
752         /* initialize the category hash table */
753         ctl.keysize = MAX_CATNAME_LEN;
754         ctl.entrysize = sizeof(crosstab_HashEnt);
755
756         /*
757          * use INIT_CATS, defined above as a guess of how many hash table entries
758          * to create, initially
759          */
760         crosstab_HashTable = hash_create("crosstab hash", INIT_CATS, &ctl, HASH_ELEM);
761
762         /* Connect to SPI manager */
763         if ((ret = SPI_connect()) < 0)
764                 /* internal error */
765                 elog(ERROR, "load_categories_hash: SPI_connect returned %d", ret);
766
767         /* Retrieve the category name rows */
768         ret = SPI_execute(cats_sql, true, 0);
769         num_categories = proc = SPI_processed;
770
771         /* Check for qualifying tuples */
772         if ((ret == SPI_OK_SELECT) && (proc > 0))
773         {
774                 SPITupleTable *spi_tuptable = SPI_tuptable;
775                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
776                 int                     i;
777
778                 /*
779                  * The provided categories SQL query must always return one column:
780                  * category - the label or identifier for each column
781                  */
782                 if (spi_tupdesc->natts != 1)
783                         ereport(ERROR,
784                                         (errcode(ERRCODE_SYNTAX_ERROR),
785                                          errmsg("provided \"categories\" SQL must " \
786                                                         "return 1 column of at least one row")));
787
788                 for (i = 0; i < proc; i++)
789                 {
790                         crosstab_cat_desc *catdesc;
791                         char       *catname;
792                         HeapTuple       spi_tuple;
793
794                         /* get the next sql result tuple */
795                         spi_tuple = spi_tuptable->vals[i];
796
797                         /* get the category from the current sql result tuple */
798                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
799
800                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
801
802                         catdesc = (crosstab_cat_desc *) palloc(sizeof(crosstab_cat_desc));
803                         catdesc->catname = catname;
804                         catdesc->attidx = i;
805
806                         /* Add the proc description block to the hashtable */
807                         crosstab_HashTableInsert(catdesc);
808
809                         MemoryContextSwitchTo(SPIcontext);
810                 }
811         }
812
813         if (SPI_finish() != SPI_OK_FINISH)
814                 /* internal error */
815                 elog(ERROR, "load_categories_hash: SPI_finish() failed");
816
817         return num_categories;
818 }
819
820 /*
821  * create and populate the crosstab tuplestore using the provided source query
822  */
823 static Tuplestorestate *
824 get_crosstab_tuplestore(char *sql,
825                                                 int num_categories,
826                                                 TupleDesc tupdesc,
827                                                 MemoryContext per_query_ctx)
828 {
829         Tuplestorestate *tupstore;
830         AttInMetadata *attinmeta = TupleDescGetAttInMetadata(tupdesc);
831         char      **values;
832         HeapTuple       tuple;
833         int                     ret;
834         int                     proc;
835         MemoryContext SPIcontext;
836
837         /* initialize our tuplestore */
838         tupstore = tuplestore_begin_heap(true, false, work_mem);
839
840         /* Connect to SPI manager */
841         if ((ret = SPI_connect()) < 0)
842                 /* internal error */
843                 elog(ERROR, "get_crosstab_tuplestore: SPI_connect returned %d", ret);
844
845         /* Now retrieve the crosstab source rows */
846         ret = SPI_execute(sql, true, 0);
847         proc = SPI_processed;
848
849         /* Check for qualifying tuples */
850         if ((ret == SPI_OK_SELECT) && (proc > 0))
851         {
852                 SPITupleTable *spi_tuptable = SPI_tuptable;
853                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
854                 int                     ncols = spi_tupdesc->natts;
855                 char       *rowid;
856                 char       *lastrowid = NULL;
857                 int                     i,
858                                         j;
859                 int                     result_ncols;
860
861                 if (num_categories == 0)
862                 {
863                         /* no qualifying category tuples */
864                         ereport(ERROR,
865                                         (errcode(ERRCODE_SYNTAX_ERROR),
866                                          errmsg("provided \"categories\" SQL must " \
867                                                         "return 1 column of at least one row")));
868                 }
869
870                 /*
871                  * The provided SQL query must always return at least three columns:
872                  *
873                  * 1. rowname   the label for each row - column 1 in the final result 2.
874                  * category  the label for each value-column in the final result 3.
875                  * value         the values used to populate the value-columns
876                  *
877                  * If there are more than three columns, the last two are taken as
878                  * "category" and "values". The first column is taken as "rowname".
879                  * Additional columns (2 thru N-2) are assumed the same for the same
880                  * "rowname", and are copied into the result tuple from the first time
881                  * we encounter a particular rowname.
882                  */
883                 if (ncols < 3)
884                         ereport(ERROR,
885                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
886                                          errmsg("invalid source data SQL statement"),
887                                          errdetail("The provided SQL must return 3 " \
888                                                            " columns; rowid, category, and values.")));
889
890                 result_ncols = (ncols - 2) + num_categories;
891
892                 /* Recheck to make sure we tuple descriptor still looks reasonable */
893                 if (tupdesc->natts != result_ncols)
894                         ereport(ERROR,
895                                         (errcode(ERRCODE_SYNTAX_ERROR),
896                                          errmsg("invalid return type"),
897                                          errdetail("query-specified return " \
898                                                            "tuple has %d columns but crosstab " \
899                                                            "returns %d", tupdesc->natts, result_ncols)));
900
901                 /* allocate space */
902                 values = (char **) palloc(result_ncols * sizeof(char *));
903
904                 /* and make sure it's clear */
905                 memset(values, '\0', result_ncols * sizeof(char *));
906
907                 for (i = 0; i < proc; i++)
908                 {
909                         HeapTuple       spi_tuple;
910                         crosstab_cat_desc *catdesc;
911                         char       *catname;
912
913                         /* get the next sql result tuple */
914                         spi_tuple = spi_tuptable->vals[i];
915
916                         /* get the rowid from the current sql result tuple */
917                         rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
918
919                         /* if rowid is null, skip this tuple entirely */
920                         if (rowid == NULL)
921                                 continue;
922
923                         /*
924                          * if we're on a new output row, grab the column values up to
925                          * column N-2 now
926                          */
927                         if ((lastrowid == NULL) || (strcmp(rowid, lastrowid) != 0))
928                         {
929                                 /*
930                                  * a new row means we need to flush the old one first, unless
931                                  * we're on the very first row
932                                  */
933                                 if (lastrowid != NULL)
934                                 {
935                                         /*
936                                          * switch to appropriate context while storing the tuple
937                                          */
938                                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
939
940                                         /* rowid changed, flush the previous output row */
941                                         tuple = BuildTupleFromCStrings(attinmeta, values);
942                                         tuplestore_puttuple(tupstore, tuple);
943                                         for (j = 0; j < result_ncols; j++)
944                                                 xpfree(values[j]);
945
946                                         /* now reset the context */
947                                         MemoryContextSwitchTo(SPIcontext);
948                                 }
949
950                                 values[0] = rowid;
951                                 for (j = 1; j < ncols - 2; j++)
952                                         values[j] = SPI_getvalue(spi_tuple, spi_tupdesc, j + 1);
953                         }
954
955                         /* look up the category and fill in the appropriate column */
956                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, ncols - 1);
957
958                         if (catname != NULL)
959                         {
960                                 crosstab_HashTableLookup(catname, catdesc);
961
962                                 if (catdesc)
963                                         values[catdesc->attidx + ncols - 2] =
964                                                 SPI_getvalue(spi_tuple, spi_tupdesc, ncols);
965                         }
966
967                         xpfree(lastrowid);
968                         lastrowid = pstrdup(rowid);
969                 }
970
971                 /* switch to appropriate context while storing the tuple */
972                 SPIcontext = MemoryContextSwitchTo(per_query_ctx);
973
974                 /* flush the last output row */
975                 tuple = BuildTupleFromCStrings(attinmeta, values);
976                 tuplestore_puttuple(tupstore, tuple);
977
978                 /* now reset the context */
979                 MemoryContextSwitchTo(SPIcontext);
980
981         }
982
983         if (SPI_finish() != SPI_OK_FINISH)
984                 /* internal error */
985                 elog(ERROR, "get_crosstab_tuplestore: SPI_finish() failed");
986
987         tuplestore_donestoring(tupstore);
988
989         return tupstore;
990 }
991
992 /*
993  * connectby_text - produce a result set from a hierarchical (parent/child)
994  * table.
995  *
996  * e.g. given table foo:
997  *
998  *                      keyid   parent_keyid pos
999  *                      ------+------------+--
1000  *                      row1    NULL             0
1001  *                      row2    row1             0
1002  *                      row3    row1             0
1003  *                      row4    row2             1
1004  *                      row5    row2             0
1005  *                      row6    row4             0
1006  *                      row7    row3             0
1007  *                      row8    row6             0
1008  *                      row9    row5             0
1009  *
1010  *
1011  * connectby(text relname, text keyid_fld, text parent_keyid_fld
1012  *                        [, text orderby_fld], text start_with, int max_depth
1013  *                        [, text branch_delim])
1014  * connectby('foo', 'keyid', 'parent_keyid', 'pos', 'row2', 0, '~') returns:
1015  *
1016  *              keyid   parent_id       level    branch                         serial
1017  *              ------+-----------+--------+-----------------------
1018  *              row2    NULL              0               row2                            1
1019  *              row5    row2              1               row2~row5                       2
1020  *              row9    row5              2               row2~row5~row9          3
1021  *              row4    row2              1               row2~row4                       4
1022  *              row6    row4              2               row2~row4~row6          5
1023  *              row8    row6              3               row2~row4~row6~row8 6
1024  *
1025  */
1026 PG_FUNCTION_INFO_V1(connectby_text);
1027
1028 #define CONNECTBY_NCOLS                                 4
1029 #define CONNECTBY_NCOLS_NOBRANCH                3
1030
1031 Datum
1032 connectby_text(PG_FUNCTION_ARGS)
1033 {
1034         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
1035         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
1036         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
1037         char       *start_with = GET_STR(PG_GETARG_TEXT_P(3));
1038         int                     max_depth = PG_GETARG_INT32(4);
1039         char       *branch_delim = NULL;
1040         bool            show_branch = false;
1041         bool            show_serial = false;
1042         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1043         TupleDesc       tupdesc;
1044         AttInMetadata *attinmeta;
1045         MemoryContext per_query_ctx;
1046         MemoryContext oldcontext;
1047
1048         /* check to see if caller supports us returning a tuplestore */
1049         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1050                 ereport(ERROR,
1051                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1052                                  errmsg("set-valued function called in context that cannot accept a set")));
1053         if (!(rsinfo->allowedModes & SFRM_Materialize))
1054                 ereport(ERROR,
1055                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1056                                  errmsg("materialize mode required, but it is not " \
1057                                                 "allowed in this context")));
1058
1059         if (fcinfo->nargs == 6)
1060         {
1061                 branch_delim = GET_STR(PG_GETARG_TEXT_P(5));
1062                 show_branch = true;
1063         }
1064         else
1065                 /* default is no show, tilde for the delimiter */
1066                 branch_delim = pstrdup("~");
1067
1068         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1069         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1070
1071         /* get the requested return tuple description */
1072         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1073
1074         /* does it meet our needs */
1075         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1076
1077         /* OK, use it then */
1078         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1079
1080         /* OK, go to work */
1081         rsinfo->returnMode = SFRM_Materialize;
1082         rsinfo->setResult = connectby(relname,
1083                                                                   key_fld,
1084                                                                   parent_key_fld,
1085                                                                   NULL,
1086                                                                   branch_delim,
1087                                                                   start_with,
1088                                                                   max_depth,
1089                                                                   show_branch,
1090                                                                   show_serial,
1091                                                                   per_query_ctx,
1092                                                                   attinmeta);
1093         rsinfo->setDesc = tupdesc;
1094
1095         MemoryContextSwitchTo(oldcontext);
1096
1097         /*
1098          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1099          * tuples are in our tuplestore and passed back through rsinfo->setResult.
1100          * rsinfo->setDesc is set to the tuple description that we actually used
1101          * to build our tuples with, so the caller can verify we did what it was
1102          * expecting.
1103          */
1104         return (Datum) 0;
1105 }
1106
1107 PG_FUNCTION_INFO_V1(connectby_text_serial);
1108 Datum
1109 connectby_text_serial(PG_FUNCTION_ARGS)
1110 {
1111         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
1112         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
1113         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
1114         char       *orderby_fld = GET_STR(PG_GETARG_TEXT_P(3));
1115         char       *start_with = GET_STR(PG_GETARG_TEXT_P(4));
1116         int                     max_depth = PG_GETARG_INT32(5);
1117         char       *branch_delim = NULL;
1118         bool            show_branch = false;
1119         bool            show_serial = true;
1120
1121         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1122         TupleDesc       tupdesc;
1123         AttInMetadata *attinmeta;
1124         MemoryContext per_query_ctx;
1125         MemoryContext oldcontext;
1126
1127         /* check to see if caller supports us returning a tuplestore */
1128         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1129                 ereport(ERROR,
1130                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1131                                  errmsg("set-valued function called in context that cannot accept a set")));
1132         if (!(rsinfo->allowedModes & SFRM_Materialize))
1133                 ereport(ERROR,
1134                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1135                                  errmsg("materialize mode required, but it is not " \
1136                                                 "allowed in this context")));
1137
1138         if (fcinfo->nargs == 7)
1139         {
1140                 branch_delim = GET_STR(PG_GETARG_TEXT_P(6));
1141                 show_branch = true;
1142         }
1143         else
1144                 /* default is no show, tilde for the delimiter */
1145                 branch_delim = pstrdup("~");
1146
1147         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1148         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1149
1150         /* get the requested return tuple description */
1151         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1152
1153         /* does it meet our needs */
1154         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1155
1156         /* OK, use it then */
1157         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1158
1159         /* OK, go to work */
1160         rsinfo->returnMode = SFRM_Materialize;
1161         rsinfo->setResult = connectby(relname,
1162                                                                   key_fld,
1163                                                                   parent_key_fld,
1164                                                                   orderby_fld,
1165                                                                   branch_delim,
1166                                                                   start_with,
1167                                                                   max_depth,
1168                                                                   show_branch,
1169                                                                   show_serial,
1170                                                                   per_query_ctx,
1171                                                                   attinmeta);
1172         rsinfo->setDesc = tupdesc;
1173
1174         MemoryContextSwitchTo(oldcontext);
1175
1176         /*
1177          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1178          * tuples are in our tuplestore and passed back through rsinfo->setResult.
1179          * rsinfo->setDesc is set to the tuple description that we actually used
1180          * to build our tuples with, so the caller can verify we did what it was
1181          * expecting.
1182          */
1183         return (Datum) 0;
1184 }
1185
1186
1187 /*
1188  * connectby - does the real work for connectby_text()
1189  */
1190 static Tuplestorestate *
1191 connectby(char *relname,
1192                   char *key_fld,
1193                   char *parent_key_fld,
1194                   char *orderby_fld,
1195                   char *branch_delim,
1196                   char *start_with,
1197                   int max_depth,
1198                   bool show_branch,
1199                   bool show_serial,
1200                   MemoryContext per_query_ctx,
1201                   AttInMetadata *attinmeta)
1202 {
1203         Tuplestorestate *tupstore = NULL;
1204         int                     ret;
1205         MemoryContext oldcontext;
1206
1207         int                     serial = 1;
1208
1209         /* Connect to SPI manager */
1210         if ((ret = SPI_connect()) < 0)
1211                 /* internal error */
1212                 elog(ERROR, "connectby: SPI_connect returned %d", ret);
1213
1214         /* switch to longer term context to create the tuple store */
1215         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1216
1217         /* initialize our tuplestore */
1218         tupstore = tuplestore_begin_heap(true, false, work_mem);
1219
1220         MemoryContextSwitchTo(oldcontext);
1221
1222         /* now go get the whole tree */
1223         tupstore = build_tuplestore_recursively(key_fld,
1224                                                                                         parent_key_fld,
1225                                                                                         relname,
1226                                                                                         orderby_fld,
1227                                                                                         branch_delim,
1228                                                                                         start_with,
1229                                                                                         start_with, /* current_branch */
1230                                                                                         0,      /* initial level is 0 */
1231                                                                                         &serial,        /* initial serial is 1 */
1232                                                                                         max_depth,
1233                                                                                         show_branch,
1234                                                                                         show_serial,
1235                                                                                         per_query_ctx,
1236                                                                                         attinmeta,
1237                                                                                         tupstore);
1238
1239         SPI_finish();
1240
1241         return tupstore;
1242 }
1243
1244 static Tuplestorestate *
1245 build_tuplestore_recursively(char *key_fld,
1246                                                          char *parent_key_fld,
1247                                                          char *relname,
1248                                                          char *orderby_fld,
1249                                                          char *branch_delim,
1250                                                          char *start_with,
1251                                                          char *branch,
1252                                                          int level,
1253                                                          int *serial,
1254                                                          int max_depth,
1255                                                          bool show_branch,
1256                                                          bool show_serial,
1257                                                          MemoryContext per_query_ctx,
1258                                                          AttInMetadata *attinmeta,
1259                                                          Tuplestorestate *tupstore)
1260 {
1261         TupleDesc       tupdesc = attinmeta->tupdesc;
1262         MemoryContext oldcontext;
1263         StringInfo      sql = makeStringInfo();
1264         int                     ret;
1265         int                     proc;
1266         int                     serial_column;
1267         StringInfo      branchstr = NULL;
1268         StringInfo      chk_branchstr = NULL;
1269         StringInfo      chk_current_key = NULL;
1270         char      **values;
1271         char       *current_key;
1272         char       *current_key_parent;
1273         char            current_level[INT32_STRLEN];
1274         char            serial_str[INT32_STRLEN];
1275         char       *current_branch;
1276         HeapTuple       tuple;
1277
1278         if (max_depth > 0 && level > max_depth)
1279                 return tupstore;
1280
1281         /* start a new branch */
1282         branchstr = makeStringInfo();
1283
1284         /* need these to check for recursion */
1285         chk_branchstr = makeStringInfo();
1286         chk_current_key = makeStringInfo();
1287
1288         /* Build initial sql statement */
1289         if (!show_serial)
1290         {
1291                 appendStringInfo(sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s",
1292                                                  key_fld,
1293                                                  parent_key_fld,
1294                                                  relname,
1295                                                  parent_key_fld,
1296                                                  quote_literal_cstr(start_with),
1297                                                  key_fld, key_fld, parent_key_fld);
1298                 serial_column = 0;
1299         }
1300         else
1301         {
1302                 appendStringInfo(sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s ORDER BY %s",
1303                                                  key_fld,
1304                                                  parent_key_fld,
1305                                                  relname,
1306                                                  parent_key_fld,
1307                                                  quote_literal_cstr(start_with),
1308                                                  key_fld, key_fld, parent_key_fld,
1309                                                  orderby_fld);
1310                 serial_column = 1;
1311         }
1312
1313         if (show_branch)
1314                 values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
1315         else
1316                 values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
1317
1318         /* First time through, do a little setup */
1319         if (level == 0)
1320         {
1321                 /* root value is the one we initially start with */
1322                 values[0] = start_with;
1323
1324                 /* root value has no parent */
1325                 values[1] = NULL;
1326
1327                 /* root level is 0 */
1328                 sprintf(current_level, "%d", level);
1329                 values[2] = current_level;
1330
1331                 /* root branch is just starting root value */
1332                 if (show_branch)
1333                         values[3] = start_with;
1334
1335                 /* root starts the serial with 1 */
1336                 if (show_serial)
1337                 {
1338                         sprintf(serial_str, "%d", (*serial)++);
1339                         if (show_branch)
1340                                 values[4] = serial_str;
1341                         else
1342                                 values[3] = serial_str;
1343                 }
1344
1345                 /* construct the tuple */
1346                 tuple = BuildTupleFromCStrings(attinmeta, values);
1347
1348                 /* switch to long lived context while storing the tuple */
1349                 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1350
1351                 /* now store it */
1352                 tuplestore_puttuple(tupstore, tuple);
1353
1354                 /* now reset the context */
1355                 MemoryContextSwitchTo(oldcontext);
1356
1357                 /* increment level */
1358                 level++;
1359         }
1360
1361         /* Retrieve the desired rows */
1362         ret = SPI_execute(sql->data, true, 0);
1363         proc = SPI_processed;
1364
1365         /* Check for qualifying tuples */
1366         if ((ret == SPI_OK_SELECT) && (proc > 0))
1367         {
1368                 HeapTuple       spi_tuple;
1369                 SPITupleTable *tuptable = SPI_tuptable;
1370                 TupleDesc       spi_tupdesc = tuptable->tupdesc;
1371                 int                     i;
1372
1373                 /* First time through, do a little more setup */
1374                 if (level == 0)
1375                 {
1376                         /*
1377                          * Check that return tupdesc is compatible with the one we got
1378                          * from the query, but only at level 0 -- no need to check more
1379                          * than once
1380                          */
1381
1382                         if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
1383                                 ereport(ERROR,
1384                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1385                                                  errmsg("invalid return type"),
1386                                                  errdetail("Return and SQL tuple descriptions are " \
1387                                                                    "incompatible.")));
1388                 }
1389
1390                 for (i = 0; i < proc; i++)
1391                 {
1392                         /* initialize branch for this pass */
1393                         appendStringInfo(branchstr, "%s", branch);
1394                         appendStringInfo(chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
1395
1396                         /* get the next sql result tuple */
1397                         spi_tuple = tuptable->vals[i];
1398
1399                         /* get the current key and parent */
1400                         current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
1401                         appendStringInfo(chk_current_key, "%s%s%s", branch_delim, current_key, branch_delim);
1402                         current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
1403
1404                         /* get the current level */
1405                         sprintf(current_level, "%d", level);
1406
1407                         /* check to see if this key is also an ancestor */
1408                         if (strstr(chk_branchstr->data, chk_current_key->data))
1409                                 elog(ERROR, "infinite recursion detected");
1410
1411                         /* OK, extend the branch */
1412                         appendStringInfo(branchstr, "%s%s", branch_delim, current_key);
1413                         current_branch = branchstr->data;
1414
1415                         /* build a tuple */
1416                         values[0] = pstrdup(current_key);
1417                         values[1] = current_key_parent;
1418                         values[2] = current_level;
1419                         if (show_branch)
1420                                 values[3] = current_branch;
1421                         if (show_serial)
1422                         {
1423                                 sprintf(serial_str, "%d", (*serial)++);
1424                                 if (show_branch)
1425                                         values[4] = serial_str;
1426                                 else
1427                                         values[3] = serial_str;
1428                         }
1429
1430                         tuple = BuildTupleFromCStrings(attinmeta, values);
1431
1432                         xpfree(current_key);
1433                         xpfree(current_key_parent);
1434
1435                         /* switch to long lived context while storing the tuple */
1436                         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1437
1438                         /* store the tuple for later use */
1439                         tuplestore_puttuple(tupstore, tuple);
1440
1441                         /* now reset the context */
1442                         MemoryContextSwitchTo(oldcontext);
1443
1444                         heap_freetuple(tuple);
1445
1446                         /* recurse using current_key_parent as the new start_with */
1447                         tupstore = build_tuplestore_recursively(key_fld,
1448                                                                                                         parent_key_fld,
1449                                                                                                         relname,
1450                                                                                                         orderby_fld,
1451                                                                                                         branch_delim,
1452                                                                                                         values[0],
1453                                                                                                         current_branch,
1454                                                                                                         level + 1,
1455                                                                                                         serial,
1456                                                                                                         max_depth,
1457                                                                                                         show_branch,
1458                                                                                                         show_serial,
1459                                                                                                         per_query_ctx,
1460                                                                                                         attinmeta,
1461                                                                                                         tupstore);
1462
1463                         /* reset branch for next pass */
1464                         xpfree(branchstr->data);
1465                         initStringInfo(branchstr);
1466
1467                         xpfree(chk_branchstr->data);
1468                         initStringInfo(chk_branchstr);
1469
1470                         xpfree(chk_current_key->data);
1471                         initStringInfo(chk_current_key);
1472                 }
1473         }
1474
1475         return tupstore;
1476 }
1477
1478 /*
1479  * Check expected (query runtime) tupdesc suitable for Connectby
1480  */
1481 static void
1482 validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial)
1483 {
1484         int                     serial_column = 0;
1485
1486         if (show_serial)
1487                 serial_column = 1;
1488
1489         /* are there the correct number of columns */
1490         if (show_branch)
1491         {
1492                 if (tupdesc->natts != (CONNECTBY_NCOLS + serial_column))
1493                         ereport(ERROR,
1494                                         (errcode(ERRCODE_SYNTAX_ERROR),
1495                                          errmsg("invalid return type"),
1496                                          errdetail("Query-specified return tuple has " \
1497                                                            "wrong number of columns.")));
1498         }
1499         else
1500         {
1501                 if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH + serial_column)
1502                         ereport(ERROR,
1503                                         (errcode(ERRCODE_SYNTAX_ERROR),
1504                                          errmsg("invalid return type"),
1505                                          errdetail("Query-specified return tuple has " \
1506                                                            "wrong number of columns.")));
1507         }
1508
1509         /* check that the types of the first two columns match */
1510         if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
1511                 ereport(ERROR,
1512                                 (errcode(ERRCODE_SYNTAX_ERROR),
1513                                  errmsg("invalid return type"),
1514                                  errdetail("First two columns must be the same type.")));
1515
1516         /* check that the type of the third column is INT4 */
1517         if (tupdesc->attrs[2]->atttypid != INT4OID)
1518                 ereport(ERROR,
1519                                 (errcode(ERRCODE_SYNTAX_ERROR),
1520                                  errmsg("invalid return type"),
1521                                  errdetail("Third column must be type %s.",
1522                                                    format_type_be(INT4OID))));
1523
1524         /* check that the type of the fourth column is TEXT if applicable */
1525         if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
1526                 ereport(ERROR,
1527                                 (errcode(ERRCODE_SYNTAX_ERROR),
1528                                  errmsg("invalid return type"),
1529                                  errdetail("Fourth column must be type %s.",
1530                                                    format_type_be(TEXTOID))));
1531
1532         /* check that the type of the fifth column is INT4 */
1533         if (show_branch && show_serial && tupdesc->attrs[4]->atttypid != INT4OID)
1534                 elog(ERROR, "Query-specified return tuple not valid for Connectby: "
1535                          "fifth column must be type %s", format_type_be(INT4OID));
1536
1537         /* check that the type of the fifth column is INT4 */
1538         if (!show_branch && show_serial && tupdesc->attrs[3]->atttypid != INT4OID)
1539                 elog(ERROR, "Query-specified return tuple not valid for Connectby: "
1540                          "fourth column must be type %s", format_type_be(INT4OID));
1541
1542         /* OK, the tupdesc is valid for our purposes */
1543 }
1544
1545 /*
1546  * Check if spi sql tupdesc and return tupdesc are compatible
1547  */
1548 static bool
1549 compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1550 {
1551         Oid                     ret_atttypid;
1552         Oid                     sql_atttypid;
1553
1554         /* check the key_fld types match */
1555         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1556         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1557         if (ret_atttypid != sql_atttypid)
1558                 ereport(ERROR,
1559                                 (errcode(ERRCODE_SYNTAX_ERROR),
1560                                  errmsg("invalid return type"),
1561                                  errdetail("SQL key field datatype does " \
1562                                                    "not match return key field datatype.")));
1563
1564         /* check the parent_key_fld types match */
1565         ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
1566         sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
1567         if (ret_atttypid != sql_atttypid)
1568                 ereport(ERROR,
1569                                 (errcode(ERRCODE_SYNTAX_ERROR),
1570                                  errmsg("invalid return type"),
1571                                  errdetail("SQL parent key field datatype does " \
1572                                                    "not match return parent key field datatype.")));
1573
1574         /* OK, the two tupdescs are compatible for our purposes */
1575         return true;
1576 }
1577
1578 /*
1579  * Check if two tupdescs match in type of attributes
1580  */
1581 static bool
1582 compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1583 {
1584         int                     i;
1585         Form_pg_attribute ret_attr;
1586         Oid                     ret_atttypid;
1587         Form_pg_attribute sql_attr;
1588         Oid                     sql_atttypid;
1589
1590         /* check the rowid types match */
1591         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1592         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1593         if (ret_atttypid != sql_atttypid)
1594                 ereport(ERROR,
1595                                 (errcode(ERRCODE_SYNTAX_ERROR),
1596                                  errmsg("invalid return type"),
1597                                  errdetail("SQL rowid datatype does not match " \
1598                                                    "return rowid datatype.")));
1599
1600         /*
1601          * - attribute [1] of the sql tuple is the category; no need to check it -
1602          * attribute [2] of the sql tuple should match attributes [1] to [natts]
1603          * of the return tuple
1604          */
1605         sql_attr = sql_tupdesc->attrs[2];
1606         for (i = 1; i < ret_tupdesc->natts; i++)
1607         {
1608                 ret_attr = ret_tupdesc->attrs[i];
1609
1610                 if (ret_attr->atttypid != sql_attr->atttypid)
1611                         return false;
1612         }
1613
1614         /* OK, the two tupdescs are compatible for our purposes */
1615         return true;
1616 }
1617
1618 /*
1619  * Return a properly quoted literal value.
1620  * Uses quote_literal in quote.c
1621  */
1622 static char *
1623 quote_literal_cstr(char *rawstr)
1624 {
1625         text       *rawstr_text;
1626         text       *result_text;
1627         char       *result;
1628
1629         rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
1630         result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
1631         result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
1632
1633         return result;
1634 }