OSDN Git Service

pgindent run for 8.3.
[pg-rex/syncrep.git] / contrib / tablefunc / tablefunc.c
1 /*
2  * tablefunc
3  *
4  * Sample to demonstrate C functions which return setof scalar
5  * and setof composite.
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Nabil Sayegh <postgresql@e-trolley.de>
9  *
10  * Copyright (c) 2002-2007, PostgreSQL Global Development Group
11  *
12  * Permission to use, copy, modify, and distribute this software and its
13  * documentation for any purpose, without fee, and without a written agreement
14  * is hereby granted, provided that the above copyright notice and this
15  * paragraph and the following two paragraphs appear in all copies.
16  *
17  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
18  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
19  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
20  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
21  * POSSIBILITY OF SUCH DAMAGE.
22  *
23  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
24  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
25  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
26  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
27  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
28  *
29  */
30 #include "postgres.h"
31
32 #include <math.h>
33
34 #include "fmgr.h"
35 #include "funcapi.h"
36 #include "executor/spi.h"
37 #include "lib/stringinfo.h"
38 #include "miscadmin.h"
39 #include "utils/builtins.h"
40 #include "utils/guc.h"
41 #include "utils/lsyscache.h"
42
43 #include "tablefunc.h"
44
45 PG_MODULE_MAGIC;
46
47 static int      load_categories_hash(char *cats_sql, MemoryContext per_query_ctx);
48 static Tuplestorestate *get_crosstab_tuplestore(char *sql,
49                                                 int num_categories,
50                                                 TupleDesc tupdesc,
51                                                 MemoryContext per_query_ctx);
52 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial);
53 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
54 static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
55 static void get_normal_pair(float8 *x1, float8 *x2);
56 static Tuplestorestate *connectby(char *relname,
57                   char *key_fld,
58                   char *parent_key_fld,
59                   char *orderby_fld,
60                   char *branch_delim,
61                   char *start_with,
62                   int max_depth,
63                   bool show_branch,
64                   bool show_serial,
65                   MemoryContext per_query_ctx,
66                   AttInMetadata *attinmeta);
67 static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
68                                                          char *parent_key_fld,
69                                                          char *relname,
70                                                          char *orderby_fld,
71                                                          char *branch_delim,
72                                                          char *start_with,
73                                                          char *branch,
74                                                          int level,
75                                                          int *serial,
76                                                          int max_depth,
77                                                          bool show_branch,
78                                                          bool show_serial,
79                                                          MemoryContext per_query_ctx,
80                                                          AttInMetadata *attinmeta,
81                                                          Tuplestorestate *tupstore);
82 static char *quote_literal_cstr(char *rawstr);
83
84 typedef struct
85 {
86         float8          mean;                   /* mean of the distribution */
87         float8          stddev;                 /* stddev of the distribution */
88         float8          carry_val;              /* hold second generated value */
89         bool            use_carry;              /* use second generated value */
90 }       normal_rand_fctx;
91
92 typedef struct
93 {
94         SPITupleTable *spi_tuptable;    /* sql results from user query */
95         char       *lastrowid;          /* rowid of the last tuple sent */
96 }       crosstab_fctx;
97
98 #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
99 #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
100 #define xpfree(var_) \
101         do { \
102                 if (var_ != NULL) \
103                 { \
104                         pfree(var_); \
105                         var_ = NULL; \
106                 } \
107         } while (0)
108
109 #define xpstrdup(tgtvar_, srcvar_) \
110         do { \
111                 if (srcvar_) \
112                         tgtvar_ = pstrdup(srcvar_); \
113                 else \
114                         tgtvar_ = NULL; \
115         } while (0)
116
117 #define xstreq(tgtvar_, srcvar_) \
118         (((tgtvar_ == NULL) && (srcvar_ == NULL)) || \
119          ((tgtvar_ != NULL) && (srcvar_ != NULL) && (strcmp(tgtvar_, srcvar_) == 0)))
120
121 /* sign, 10 digits, '\0' */
122 #define INT32_STRLEN    12
123
124 /* hash table support */
125 static HTAB *crosstab_HashTable;
126
127 /* The information we cache about loaded procedures */
128 typedef struct crosstab_cat_desc
129 {
130         char       *catname;
131         int                     attidx;                 /* zero based */
132 }       crosstab_cat_desc;
133
134 #define MAX_CATNAME_LEN                 NAMEDATALEN
135 #define INIT_CATS                               64
136
137 #define crosstab_HashTableLookup(CATNAME, CATDESC) \
138 do { \
139         crosstab_HashEnt *hentry; char key[MAX_CATNAME_LEN]; \
140         \
141         MemSet(key, 0, MAX_CATNAME_LEN); \
142         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATNAME); \
143         hentry = (crosstab_HashEnt*) hash_search(crosstab_HashTable, \
144                                                                                  key, HASH_FIND, NULL); \
145         if (hentry) \
146                 CATDESC = hentry->catdesc; \
147         else \
148                 CATDESC = NULL; \
149 } while(0)
150
151 #define crosstab_HashTableInsert(CATDESC) \
152 do { \
153         crosstab_HashEnt *hentry; bool found; char key[MAX_CATNAME_LEN]; \
154         \
155         MemSet(key, 0, MAX_CATNAME_LEN); \
156         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATDESC->catname); \
157         hentry = (crosstab_HashEnt*) hash_search(crosstab_HashTable, \
158                                                                                  key, HASH_ENTER, &found); \
159         if (found) \
160                 ereport(ERROR, \
161                                 (errcode(ERRCODE_DUPLICATE_OBJECT), \
162                                  errmsg("duplicate category name"))); \
163         hentry->catdesc = CATDESC; \
164 } while(0)
165
166 /* hash table */
167 typedef struct crosstab_hashent
168 {
169         char            internal_catname[MAX_CATNAME_LEN];
170         crosstab_cat_desc *catdesc;
171 }       crosstab_HashEnt;
172
173 /*
174  * normal_rand - return requested number of random values
175  * with a Gaussian (Normal) distribution.
176  *
177  * inputs are int numvals, float8 mean, and float8 stddev
178  * returns setof float8
179  */
180 PG_FUNCTION_INFO_V1(normal_rand);
181 Datum
182 normal_rand(PG_FUNCTION_ARGS)
183 {
184         FuncCallContext *funcctx;
185         int                     call_cntr;
186         int                     max_calls;
187         normal_rand_fctx *fctx;
188         float8          mean;
189         float8          stddev;
190         float8          carry_val;
191         bool            use_carry;
192         MemoryContext oldcontext;
193
194         /* stuff done only on the first call of the function */
195         if (SRF_IS_FIRSTCALL())
196         {
197                 /* create a function context for cross-call persistence */
198                 funcctx = SRF_FIRSTCALL_INIT();
199
200                 /*
201                  * switch to memory context appropriate for multiple function calls
202                  */
203                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
204
205                 /* total number of tuples to be returned */
206                 funcctx->max_calls = PG_GETARG_UINT32(0);
207
208                 /* allocate memory for user context */
209                 fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
210
211                 /*
212                  * Use fctx to keep track of upper and lower bounds from call to call.
213                  * It will also be used to carry over the spare value we get from the
214                  * Box-Muller algorithm so that we only actually calculate a new value
215                  * every other call.
216                  */
217                 fctx->mean = PG_GETARG_FLOAT8(1);
218                 fctx->stddev = PG_GETARG_FLOAT8(2);
219                 fctx->carry_val = 0;
220                 fctx->use_carry = false;
221
222                 funcctx->user_fctx = fctx;
223
224                 MemoryContextSwitchTo(oldcontext);
225         }
226
227         /* stuff done on every call of the function */
228         funcctx = SRF_PERCALL_SETUP();
229
230         call_cntr = funcctx->call_cntr;
231         max_calls = funcctx->max_calls;
232         fctx = funcctx->user_fctx;
233         mean = fctx->mean;
234         stddev = fctx->stddev;
235         carry_val = fctx->carry_val;
236         use_carry = fctx->use_carry;
237
238         if (call_cntr < max_calls)      /* do when there is more left to send */
239         {
240                 float8          result;
241
242                 if (use_carry)
243                 {
244                         /*
245                          * reset use_carry and use second value obtained on last pass
246                          */
247                         fctx->use_carry = false;
248                         result = carry_val;
249                 }
250                 else
251                 {
252                         float8          normval_1;
253                         float8          normval_2;
254
255                         /* Get the next two normal values */
256                         get_normal_pair(&normval_1, &normval_2);
257
258                         /* use the first */
259                         result = mean + (stddev * normval_1);
260
261                         /* and save the second */
262                         fctx->carry_val = mean + (stddev * normval_2);
263                         fctx->use_carry = true;
264                 }
265
266                 /* send the result */
267                 SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
268         }
269         else
270                 /* do when there is no more left */
271                 SRF_RETURN_DONE(funcctx);
272 }
273
274 /*
275  * get_normal_pair()
276  * Assigns normally distributed (Gaussian) values to a pair of provided
277  * parameters, with mean 0, standard deviation 1.
278  *
279  * This routine implements Algorithm P (Polar method for normal deviates)
280  * from Knuth's _The_Art_of_Computer_Programming_, Volume 2, 3rd ed., pages
281  * 122-126. Knuth cites his source as "The polar method", G. E. P. Box, M. E.
282  * Muller, and G. Marsaglia, _Annals_Math,_Stat._ 29 (1958), 610-611.
283  *
284  */
285 static void
286 get_normal_pair(float8 *x1, float8 *x2)
287 {
288         float8          u1,
289                                 u2,
290                                 v1,
291                                 v2,
292                                 s;
293
294         do
295         {
296                 u1 = (float8) random() / (float8) MAX_RANDOM_VALUE;
297                 u2 = (float8) random() / (float8) MAX_RANDOM_VALUE;
298
299                 v1 = (2.0 * u1) - 1.0;
300                 v2 = (2.0 * u2) - 1.0;
301
302                 s = v1 * v1 + v2 * v2;
303         } while (s >= 1.0);
304
305         if (s == 0)
306         {
307                 *x1 = 0;
308                 *x2 = 0;
309         }
310         else
311         {
312                 s = sqrt((-2.0 * log(s)) / s);
313                 *x1 = v1 * s;
314                 *x2 = v2 * s;
315         }
316 }
317
318 /*
319  * crosstab - create a crosstab of rowids and values columns from a
320  * SQL statement returning one rowid column, one category column,
321  * and one value column.
322  *
323  * e.g. given sql which produces:
324  *
325  *                      rowid   cat             value
326  *                      ------+-------+-------
327  *                      row1    cat1    val1
328  *                      row1    cat2    val2
329  *                      row1    cat3    val3
330  *                      row1    cat4    val4
331  *                      row2    cat1    val5
332  *                      row2    cat2    val6
333  *                      row2    cat3    val7
334  *                      row2    cat4    val8
335  *
336  * crosstab returns:
337  *                                      <===== values columns =====>
338  *                      rowid   cat1    cat2    cat3    cat4
339  *                      ------+-------+-------+-------+-------
340  *                      row1    val1    val2    val3    val4
341  *                      row2    val5    val6    val7    val8
342  *
343  * NOTES:
344  * 1. SQL result must be ordered by 1,2.
345  * 2. The number of values columns depends on the tuple description
346  *        of the function's declared return type.  The return type's columns
347  *        must match the datatypes of the SQL query's result.  The datatype
348  *        of the category column can be anything, however.
349  * 3. Missing values (i.e. not enough adjacent rows of same rowid to
350  *        fill the number of result values columns) are filled in with nulls.
351  * 4. Extra values (i.e. too many adjacent rows of same rowid to fill
352  *        the number of result values columns) are skipped.
353  * 5. Rows with all nulls in the values columns are skipped.
354  */
355 PG_FUNCTION_INFO_V1(crosstab);
356 Datum
357 crosstab(PG_FUNCTION_ARGS)
358 {
359         FuncCallContext *funcctx;
360         TupleDesc       ret_tupdesc;
361         int                     call_cntr;
362         int                     max_calls;
363         AttInMetadata *attinmeta;
364         SPITupleTable *spi_tuptable = NULL;
365         TupleDesc       spi_tupdesc;
366         char       *lastrowid = NULL;
367         crosstab_fctx *fctx;
368         int                     i;
369         int                     num_categories;
370         bool            firstpass = false;
371         MemoryContext oldcontext;
372
373         /* stuff done only on the first call of the function */
374         if (SRF_IS_FIRSTCALL())
375         {
376                 char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
377                 TupleDesc       tupdesc;
378                 int                     ret;
379                 int                     proc;
380
381                 /* create a function context for cross-call persistence */
382                 funcctx = SRF_FIRSTCALL_INIT();
383
384                 /*
385                  * switch to memory context appropriate for multiple function calls
386                  */
387                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
388
389                 /* Connect to SPI manager */
390                 if ((ret = SPI_connect()) < 0)
391                         /* internal error */
392                         elog(ERROR, "crosstab: SPI_connect returned %d", ret);
393
394                 /* Retrieve the desired rows */
395                 ret = SPI_execute(sql, true, 0);
396                 proc = SPI_processed;
397
398                 /* Check for qualifying tuples */
399                 if ((ret == SPI_OK_SELECT) && (proc > 0))
400                 {
401                         spi_tuptable = SPI_tuptable;
402                         spi_tupdesc = spi_tuptable->tupdesc;
403
404                         /*----------
405                          * The provided SQL query must always return three columns.
406                          *
407                          * 1. rowname
408                          *      the label or identifier for each row in the final result
409                          * 2. category
410                          *      the label or identifier for each column in the final result
411                          * 3. values
412                          *      the value for each column in the final result
413                          *----------
414                          */
415                         if (spi_tupdesc->natts != 3)
416                                 ereport(ERROR,
417                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
418                                                  errmsg("invalid source data SQL statement"),
419                                                  errdetail("The provided SQL must return 3 "
420                                                                    "columns: rowid, category, and values.")));
421                 }
422                 else
423                 {
424                         /* no qualifying tuples */
425                         SPI_finish();
426                         SRF_RETURN_DONE(funcctx);
427                 }
428
429                 /* SPI switches context on us, so reset it */
430                 MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
431
432                 /* get a tuple descriptor for our result type */
433                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
434                 {
435                         case TYPEFUNC_COMPOSITE:
436                                 /* success */
437                                 break;
438                         case TYPEFUNC_RECORD:
439                                 /* failed to determine actual type of RECORD */
440                                 ereport(ERROR,
441                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
442                                                  errmsg("function returning record called in context "
443                                                                 "that cannot accept type record")));
444                                 break;
445                         default:
446                                 /* result type isn't composite */
447                                 elog(ERROR, "return type must be a row type");
448                                 break;
449                 }
450
451                 /* make sure we have a persistent copy of the tupdesc */
452                 tupdesc = CreateTupleDescCopy(tupdesc);
453
454                 /*
455                  * Check that return tupdesc is compatible with the data we got from
456                  * SPI, at least based on number and type of attributes
457                  */
458                 if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
459                         ereport(ERROR,
460                                         (errcode(ERRCODE_SYNTAX_ERROR),
461                                          errmsg("return and sql tuple descriptions are " \
462                                                         "incompatible")));
463
464                 /*
465                  * Generate attribute metadata needed later to produce tuples from raw
466                  * C strings
467                  */
468                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
469                 funcctx->attinmeta = attinmeta;
470
471                 /* allocate memory for user context */
472                 fctx = (crosstab_fctx *) palloc(sizeof(crosstab_fctx));
473
474                 /*
475                  * Save spi data for use across calls
476                  */
477                 fctx->spi_tuptable = spi_tuptable;
478                 fctx->lastrowid = NULL;
479                 funcctx->user_fctx = fctx;
480
481                 /* total number of tuples to be returned */
482                 funcctx->max_calls = proc;
483
484                 MemoryContextSwitchTo(oldcontext);
485                 firstpass = true;
486         }
487
488         /* stuff done on every call of the function */
489         funcctx = SRF_PERCALL_SETUP();
490
491         /*
492          * initialize per-call variables
493          */
494         call_cntr = funcctx->call_cntr;
495         max_calls = funcctx->max_calls;
496
497         /* user context info */
498         fctx = (crosstab_fctx *) funcctx->user_fctx;
499         lastrowid = fctx->lastrowid;
500         spi_tuptable = fctx->spi_tuptable;
501
502         /* the sql tuple */
503         spi_tupdesc = spi_tuptable->tupdesc;
504
505         /* attribute return type and return tuple description */
506         attinmeta = funcctx->attinmeta;
507         ret_tupdesc = attinmeta->tupdesc;
508
509         /* the return tuple always must have 1 rowid + num_categories columns */
510         num_categories = ret_tupdesc->natts - 1;
511
512         if (call_cntr < max_calls)      /* do when there is more left to send */
513         {
514                 HeapTuple       tuple;
515                 Datum           result;
516                 char      **values;
517                 bool            skip_tuple = false;
518
519                 while (true)
520                 {
521                         /* allocate space */
522                         values = (char **) palloc((1 + num_categories) * sizeof(char *));
523
524                         /* and make sure it's clear */
525                         memset(values, '\0', (1 + num_categories) * sizeof(char *));
526
527                         /*
528                          * now loop through the sql results and assign each value in
529                          * sequence to the next category
530                          */
531                         for (i = 0; i < num_categories; i++)
532                         {
533                                 HeapTuple       spi_tuple;
534                                 char       *rowid = NULL;
535
536                                 /* see if we've gone too far already */
537                                 if (call_cntr >= max_calls)
538                                         break;
539
540                                 /* get the next sql result tuple */
541                                 spi_tuple = spi_tuptable->vals[call_cntr];
542
543                                 /* get the rowid from the current sql result tuple */
544                                 rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
545
546                                 /*
547                                  * If this is the first pass through the values for this
548                                  * rowid, set the first column to rowid
549                                  */
550                                 if (i == 0)
551                                 {
552                                         xpstrdup(values[0], rowid);
553
554                                         /*
555                                          * Check to see if the rowid is the same as that of the
556                                          * last tuple sent -- if so, skip this tuple entirely
557                                          */
558                                         if (!firstpass && xstreq(lastrowid, rowid))
559                                         {
560                                                 skip_tuple = true;
561                                                 break;
562                                         }
563                                 }
564
565                                 /*
566                                  * If rowid hasn't changed on us, continue building the ouput
567                                  * tuple.
568                                  */
569                                 if (xstreq(rowid, values[0]))
570                                 {
571                                         /*
572                                          * Get the next category item value, which is always
573                                          * attribute number three.
574                                          *
575                                          * Be careful to assign the value to the array index based
576                                          * on which category we are presently processing.
577                                          */
578                                         values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
579
580                                         /*
581                                          * increment the counter since we consume a row for each
582                                          * category, but not for last pass because the API will do
583                                          * that for us
584                                          */
585                                         if (i < (num_categories - 1))
586                                                 call_cntr = ++funcctx->call_cntr;
587                                 }
588                                 else
589                                 {
590                                         /*
591                                          * We'll fill in NULLs for the missing values, but we need
592                                          * to decrement the counter since this sql result row
593                                          * doesn't belong to the current output tuple.
594                                          */
595                                         call_cntr = --funcctx->call_cntr;
596                                         break;
597                                 }
598                                 xpfree(rowid);
599                         }
600
601                         /*
602                          * switch to memory context appropriate for multiple function
603                          * calls
604                          */
605                         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
606
607                         xpfree(fctx->lastrowid);
608                         xpstrdup(fctx->lastrowid, values[0]);
609                         lastrowid = fctx->lastrowid;
610
611                         MemoryContextSwitchTo(oldcontext);
612
613                         if (!skip_tuple)
614                         {
615                                 /* build the tuple */
616                                 tuple = BuildTupleFromCStrings(attinmeta, values);
617
618                                 /* make the tuple into a datum */
619                                 result = HeapTupleGetDatum(tuple);
620
621                                 /* Clean up */
622                                 for (i = 0; i < num_categories + 1; i++)
623                                         if (values[i] != NULL)
624                                                 xpfree(values[i]);
625                                 xpfree(values);
626
627                                 SRF_RETURN_NEXT(funcctx, result);
628                         }
629                         else
630                         {
631                                 /*
632                                  * Skipping this tuple entirely, but we need to advance the
633                                  * counter like the API would if we had returned one.
634                                  */
635                                 call_cntr = ++funcctx->call_cntr;
636
637                                 /* we'll start over at the top */
638                                 xpfree(values);
639
640                                 /* see if we've gone too far already */
641                                 if (call_cntr >= max_calls)
642                                 {
643                                         /* release SPI related resources */
644                                         SPI_finish();
645                                         SRF_RETURN_DONE(funcctx);
646                                 }
647
648                                 /* need to reset this before the next tuple is started */
649                                 skip_tuple = false;
650                         }
651                 }
652         }
653         else
654                 /* do when there is no more left */
655         {
656                 /* release SPI related resources */
657                 SPI_finish();
658                 SRF_RETURN_DONE(funcctx);
659         }
660 }
661
662 /*
663  * crosstab_hash - reimplement crosstab as materialized function and
664  * properly deal with missing values (i.e. don't pack remaining
665  * values to the left)
666  *
667  * crosstab - create a crosstab of rowids and values columns from a
668  * SQL statement returning one rowid column, one category column,
669  * and one value column.
670  *
671  * e.g. given sql which produces:
672  *
673  *                      rowid   cat             value
674  *                      ------+-------+-------
675  *                      row1    cat1    val1
676  *                      row1    cat2    val2
677  *                      row1    cat4    val4
678  *                      row2    cat1    val5
679  *                      row2    cat2    val6
680  *                      row2    cat3    val7
681  *                      row2    cat4    val8
682  *
683  * crosstab returns:
684  *                                      <===== values columns =====>
685  *                      rowid   cat1    cat2    cat3    cat4
686  *                      ------+-------+-------+-------+-------
687  *                      row1    val1    val2    null    val4
688  *                      row2    val5    val6    val7    val8
689  *
690  * NOTES:
691  * 1. SQL result must be ordered by 1.
692  * 2. The number of values columns depends on the tuple description
693  *        of the function's declared return type.
694  * 3. Missing values (i.e. missing category) are filled in with nulls.
695  * 4. Extra values (i.e. not in category results) are skipped.
696  */
697 PG_FUNCTION_INFO_V1(crosstab_hash);
698 Datum
699 crosstab_hash(PG_FUNCTION_ARGS)
700 {
701         char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
702         char       *cats_sql = GET_STR(PG_GETARG_TEXT_P(1));
703         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
704         TupleDesc       tupdesc;
705         MemoryContext per_query_ctx;
706         MemoryContext oldcontext;
707         int                     num_categories;
708
709         /* check to see if caller supports us returning a tuplestore */
710         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
711                 ereport(ERROR,
712                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
713                                  errmsg("set-valued function called in context that cannot accept a set")));
714         if (!(rsinfo->allowedModes & SFRM_Materialize))
715                 ereport(ERROR,
716                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
717                                  errmsg("materialize mode required, but it is not " \
718                                                 "allowed in this context")));
719
720         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
721         oldcontext = MemoryContextSwitchTo(per_query_ctx);
722
723         /* get the requested return tuple description */
724         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
725
726         /*
727          * Check to make sure we have a reasonable tuple descriptor
728          *
729          * Note we will attempt to coerce the values into whatever the return
730          * attribute type is and depend on the "in" function to complain if
731          * needed.
732          */
733         if (tupdesc->natts < 2)
734                 ereport(ERROR,
735                                 (errcode(ERRCODE_SYNTAX_ERROR),
736                                  errmsg("query-specified return tuple and " \
737                                                 "crosstab function are not compatible")));
738
739         /* load up the categories hash table */
740         num_categories = load_categories_hash(cats_sql, per_query_ctx);
741
742         /* let the caller know we're sending back a tuplestore */
743         rsinfo->returnMode = SFRM_Materialize;
744
745         /* now go build it */
746         rsinfo->setResult = get_crosstab_tuplestore(sql,
747                                                                                                 num_categories,
748                                                                                                 tupdesc,
749                                                                                                 per_query_ctx);
750
751         /*
752          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
753          * tuples are in our tuplestore and passed back through rsinfo->setResult.
754          * rsinfo->setDesc is set to the tuple description that we actually used
755          * to build our tuples with, so the caller can verify we did what it was
756          * expecting.
757          */
758         rsinfo->setDesc = tupdesc;
759         MemoryContextSwitchTo(oldcontext);
760
761         return (Datum) 0;
762 }
763
764 /*
765  * load up the categories hash table
766  */
767 static int
768 load_categories_hash(char *cats_sql, MemoryContext per_query_ctx)
769 {
770         HASHCTL         ctl;
771         int                     ret;
772         int                     proc;
773         MemoryContext SPIcontext;
774         int                     num_categories = 0;
775
776         /* initialize the category hash table */
777         ctl.keysize = MAX_CATNAME_LEN;
778         ctl.entrysize = sizeof(crosstab_HashEnt);
779
780         /*
781          * use INIT_CATS, defined above as a guess of how many hash table entries
782          * to create, initially
783          */
784         crosstab_HashTable = hash_create("crosstab hash", INIT_CATS, &ctl, HASH_ELEM);
785
786         /* Connect to SPI manager */
787         if ((ret = SPI_connect()) < 0)
788                 /* internal error */
789                 elog(ERROR, "load_categories_hash: SPI_connect returned %d", ret);
790
791         /* Retrieve the category name rows */
792         ret = SPI_execute(cats_sql, true, 0);
793         num_categories = proc = SPI_processed;
794
795         /* Check for qualifying tuples */
796         if ((ret == SPI_OK_SELECT) && (proc > 0))
797         {
798                 SPITupleTable *spi_tuptable = SPI_tuptable;
799                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
800                 int                     i;
801
802                 /*
803                  * The provided categories SQL query must always return one column:
804                  * category - the label or identifier for each column
805                  */
806                 if (spi_tupdesc->natts != 1)
807                         ereport(ERROR,
808                                         (errcode(ERRCODE_SYNTAX_ERROR),
809                                          errmsg("provided \"categories\" SQL must " \
810                                                         "return 1 column of at least one row")));
811
812                 for (i = 0; i < proc; i++)
813                 {
814                         crosstab_cat_desc *catdesc;
815                         char       *catname;
816                         HeapTuple       spi_tuple;
817
818                         /* get the next sql result tuple */
819                         spi_tuple = spi_tuptable->vals[i];
820
821                         /* get the category from the current sql result tuple */
822                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
823
824                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
825
826                         catdesc = (crosstab_cat_desc *) palloc(sizeof(crosstab_cat_desc));
827                         catdesc->catname = catname;
828                         catdesc->attidx = i;
829
830                         /* Add the proc description block to the hashtable */
831                         crosstab_HashTableInsert(catdesc);
832
833                         MemoryContextSwitchTo(SPIcontext);
834                 }
835         }
836
837         if (SPI_finish() != SPI_OK_FINISH)
838                 /* internal error */
839                 elog(ERROR, "load_categories_hash: SPI_finish() failed");
840
841         return num_categories;
842 }
843
844 /*
845  * create and populate the crosstab tuplestore using the provided source query
846  */
847 static Tuplestorestate *
848 get_crosstab_tuplestore(char *sql,
849                                                 int num_categories,
850                                                 TupleDesc tupdesc,
851                                                 MemoryContext per_query_ctx)
852 {
853         Tuplestorestate *tupstore;
854         AttInMetadata *attinmeta = TupleDescGetAttInMetadata(tupdesc);
855         char      **values;
856         HeapTuple       tuple;
857         int                     ret;
858         int                     proc;
859         MemoryContext SPIcontext;
860
861         /* initialize our tuplestore */
862         tupstore = tuplestore_begin_heap(true, false, work_mem);
863
864         /* Connect to SPI manager */
865         if ((ret = SPI_connect()) < 0)
866                 /* internal error */
867                 elog(ERROR, "get_crosstab_tuplestore: SPI_connect returned %d", ret);
868
869         /* Now retrieve the crosstab source rows */
870         ret = SPI_execute(sql, true, 0);
871         proc = SPI_processed;
872
873         /* Check for qualifying tuples */
874         if ((ret == SPI_OK_SELECT) && (proc > 0))
875         {
876                 SPITupleTable *spi_tuptable = SPI_tuptable;
877                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
878                 int                     ncols = spi_tupdesc->natts;
879                 char       *rowid;
880                 char       *lastrowid = NULL;
881                 bool            firstpass = true;
882                 int                     i,
883                                         j;
884                 int                     result_ncols;
885
886                 if (num_categories == 0)
887                 {
888                         /* no qualifying category tuples */
889                         ereport(ERROR,
890                                         (errcode(ERRCODE_SYNTAX_ERROR),
891                                          errmsg("provided \"categories\" SQL must " \
892                                                         "return 1 column of at least one row")));
893                 }
894
895                 /*
896                  * The provided SQL query must always return at least three columns:
897                  *
898                  * 1. rowname   the label for each row - column 1 in the final result
899                  * 2. category  the label for each value-column in the final result 3.
900                  * value         the values used to populate the value-columns
901                  *
902                  * If there are more than three columns, the last two are taken as
903                  * "category" and "values". The first column is taken as "rowname".
904                  * Additional columns (2 thru N-2) are assumed the same for the same
905                  * "rowname", and are copied into the result tuple from the first time
906                  * we encounter a particular rowname.
907                  */
908                 if (ncols < 3)
909                         ereport(ERROR,
910                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
911                                          errmsg("invalid source data SQL statement"),
912                                          errdetail("The provided SQL must return 3 " \
913                                                            " columns; rowid, category, and values.")));
914
915                 result_ncols = (ncols - 2) + num_categories;
916
917                 /* Recheck to make sure we tuple descriptor still looks reasonable */
918                 if (tupdesc->natts != result_ncols)
919                         ereport(ERROR,
920                                         (errcode(ERRCODE_SYNTAX_ERROR),
921                                          errmsg("invalid return type"),
922                                          errdetail("Query-specified return " \
923                                                            "tuple has %d columns but crosstab " \
924                                                            "returns %d.", tupdesc->natts, result_ncols)));
925
926                 /* allocate space */
927                 values = (char **) palloc(result_ncols * sizeof(char *));
928
929                 /* and make sure it's clear */
930                 memset(values, '\0', result_ncols * sizeof(char *));
931
932                 for (i = 0; i < proc; i++)
933                 {
934                         HeapTuple       spi_tuple;
935                         crosstab_cat_desc *catdesc;
936                         char       *catname;
937
938                         /* get the next sql result tuple */
939                         spi_tuple = spi_tuptable->vals[i];
940
941                         /* get the rowid from the current sql result tuple */
942                         rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
943
944                         /*
945                          * if we're on a new output row, grab the column values up to
946                          * column N-2 now
947                          */
948                         if (firstpass || !xstreq(lastrowid, rowid))
949                         {
950                                 /*
951                                  * a new row means we need to flush the old one first, unless
952                                  * we're on the very first row
953                                  */
954                                 if (!firstpass)
955                                 {
956                                         /* rowid changed, flush the previous output row */
957                                         tuple = BuildTupleFromCStrings(attinmeta, values);
958
959                                         /* switch to appropriate context while storing the tuple */
960                                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
961                                         tuplestore_puttuple(tupstore, tuple);
962                                         MemoryContextSwitchTo(SPIcontext);
963
964                                         for (j = 0; j < result_ncols; j++)
965                                                 xpfree(values[j]);
966                                 }
967
968                                 values[0] = rowid;
969                                 for (j = 1; j < ncols - 2; j++)
970                                         values[j] = SPI_getvalue(spi_tuple, spi_tupdesc, j + 1);
971
972                                 /* we're no longer on the first pass */
973                                 firstpass = false;
974                         }
975
976                         /* look up the category and fill in the appropriate column */
977                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, ncols - 1);
978
979                         if (catname != NULL)
980                         {
981                                 crosstab_HashTableLookup(catname, catdesc);
982
983                                 if (catdesc)
984                                         values[catdesc->attidx + ncols - 2] =
985                                                 SPI_getvalue(spi_tuple, spi_tupdesc, ncols);
986                         }
987
988                         xpfree(lastrowid);
989                         xpstrdup(lastrowid, rowid);
990                 }
991
992                 /* flush the last output row */
993                 tuple = BuildTupleFromCStrings(attinmeta, values);
994
995                 /* switch to appropriate context while storing the tuple */
996                 SPIcontext = MemoryContextSwitchTo(per_query_ctx);
997                 tuplestore_puttuple(tupstore, tuple);
998                 MemoryContextSwitchTo(SPIcontext);
999         }
1000
1001         if (SPI_finish() != SPI_OK_FINISH)
1002                 /* internal error */
1003                 elog(ERROR, "get_crosstab_tuplestore: SPI_finish() failed");
1004
1005         tuplestore_donestoring(tupstore);
1006
1007         return tupstore;
1008 }
1009
1010 /*
1011  * connectby_text - produce a result set from a hierarchical (parent/child)
1012  * table.
1013  *
1014  * e.g. given table foo:
1015  *
1016  *                      keyid   parent_keyid pos
1017  *                      ------+------------+--
1018  *                      row1    NULL             0
1019  *                      row2    row1             0
1020  *                      row3    row1             0
1021  *                      row4    row2             1
1022  *                      row5    row2             0
1023  *                      row6    row4             0
1024  *                      row7    row3             0
1025  *                      row8    row6             0
1026  *                      row9    row5             0
1027  *
1028  *
1029  * connectby(text relname, text keyid_fld, text parent_keyid_fld
1030  *                        [, text orderby_fld], text start_with, int max_depth
1031  *                        [, text branch_delim])
1032  * connectby('foo', 'keyid', 'parent_keyid', 'pos', 'row2', 0, '~') returns:
1033  *
1034  *              keyid   parent_id       level    branch                         serial
1035  *              ------+-----------+--------+-----------------------
1036  *              row2    NULL              0               row2                            1
1037  *              row5    row2              1               row2~row5                       2
1038  *              row9    row5              2               row2~row5~row9          3
1039  *              row4    row2              1               row2~row4                       4
1040  *              row6    row4              2               row2~row4~row6          5
1041  *              row8    row6              3               row2~row4~row6~row8 6
1042  *
1043  */
1044 PG_FUNCTION_INFO_V1(connectby_text);
1045
1046 #define CONNECTBY_NCOLS                                 4
1047 #define CONNECTBY_NCOLS_NOBRANCH                3
1048
1049 Datum
1050 connectby_text(PG_FUNCTION_ARGS)
1051 {
1052         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
1053         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
1054         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
1055         char       *start_with = GET_STR(PG_GETARG_TEXT_P(3));
1056         int                     max_depth = PG_GETARG_INT32(4);
1057         char       *branch_delim = NULL;
1058         bool            show_branch = false;
1059         bool            show_serial = false;
1060         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1061         TupleDesc       tupdesc;
1062         AttInMetadata *attinmeta;
1063         MemoryContext per_query_ctx;
1064         MemoryContext oldcontext;
1065
1066         /* check to see if caller supports us returning a tuplestore */
1067         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1068                 ereport(ERROR,
1069                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1070                                  errmsg("set-valued function called in context that cannot accept a set")));
1071         if (!(rsinfo->allowedModes & SFRM_Materialize))
1072                 ereport(ERROR,
1073                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1074                                  errmsg("materialize mode required, but it is not " \
1075                                                 "allowed in this context")));
1076
1077         if (fcinfo->nargs == 6)
1078         {
1079                 branch_delim = GET_STR(PG_GETARG_TEXT_P(5));
1080                 show_branch = true;
1081         }
1082         else
1083                 /* default is no show, tilde for the delimiter */
1084                 branch_delim = pstrdup("~");
1085
1086         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1087         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1088
1089         /* get the requested return tuple description */
1090         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1091
1092         /* does it meet our needs */
1093         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1094
1095         /* OK, use it then */
1096         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1097
1098         /* OK, go to work */
1099         rsinfo->returnMode = SFRM_Materialize;
1100         rsinfo->setResult = connectby(relname,
1101                                                                   key_fld,
1102                                                                   parent_key_fld,
1103                                                                   NULL,
1104                                                                   branch_delim,
1105                                                                   start_with,
1106                                                                   max_depth,
1107                                                                   show_branch,
1108                                                                   show_serial,
1109                                                                   per_query_ctx,
1110                                                                   attinmeta);
1111         rsinfo->setDesc = tupdesc;
1112
1113         MemoryContextSwitchTo(oldcontext);
1114
1115         /*
1116          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1117          * tuples are in our tuplestore and passed back through rsinfo->setResult.
1118          * rsinfo->setDesc is set to the tuple description that we actually used
1119          * to build our tuples with, so the caller can verify we did what it was
1120          * expecting.
1121          */
1122         return (Datum) 0;
1123 }
1124
1125 PG_FUNCTION_INFO_V1(connectby_text_serial);
1126 Datum
1127 connectby_text_serial(PG_FUNCTION_ARGS)
1128 {
1129         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
1130         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
1131         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
1132         char       *orderby_fld = GET_STR(PG_GETARG_TEXT_P(3));
1133         char       *start_with = GET_STR(PG_GETARG_TEXT_P(4));
1134         int                     max_depth = PG_GETARG_INT32(5);
1135         char       *branch_delim = NULL;
1136         bool            show_branch = false;
1137         bool            show_serial = true;
1138
1139         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1140         TupleDesc       tupdesc;
1141         AttInMetadata *attinmeta;
1142         MemoryContext per_query_ctx;
1143         MemoryContext oldcontext;
1144
1145         /* check to see if caller supports us returning a tuplestore */
1146         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1147                 ereport(ERROR,
1148                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1149                                  errmsg("set-valued function called in context that cannot accept a set")));
1150         if (!(rsinfo->allowedModes & SFRM_Materialize))
1151                 ereport(ERROR,
1152                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1153                                  errmsg("materialize mode required, but it is not " \
1154                                                 "allowed in this context")));
1155
1156         if (fcinfo->nargs == 7)
1157         {
1158                 branch_delim = GET_STR(PG_GETARG_TEXT_P(6));
1159                 show_branch = true;
1160         }
1161         else
1162                 /* default is no show, tilde for the delimiter */
1163                 branch_delim = pstrdup("~");
1164
1165         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1166         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1167
1168         /* get the requested return tuple description */
1169         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1170
1171         /* does it meet our needs */
1172         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1173
1174         /* OK, use it then */
1175         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1176
1177         /* OK, go to work */
1178         rsinfo->returnMode = SFRM_Materialize;
1179         rsinfo->setResult = connectby(relname,
1180                                                                   key_fld,
1181                                                                   parent_key_fld,
1182                                                                   orderby_fld,
1183                                                                   branch_delim,
1184                                                                   start_with,
1185                                                                   max_depth,
1186                                                                   show_branch,
1187                                                                   show_serial,
1188                                                                   per_query_ctx,
1189                                                                   attinmeta);
1190         rsinfo->setDesc = tupdesc;
1191
1192         MemoryContextSwitchTo(oldcontext);
1193
1194         /*
1195          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1196          * tuples are in our tuplestore and passed back through rsinfo->setResult.
1197          * rsinfo->setDesc is set to the tuple description that we actually used
1198          * to build our tuples with, so the caller can verify we did what it was
1199          * expecting.
1200          */
1201         return (Datum) 0;
1202 }
1203
1204
1205 /*
1206  * connectby - does the real work for connectby_text()
1207  */
1208 static Tuplestorestate *
1209 connectby(char *relname,
1210                   char *key_fld,
1211                   char *parent_key_fld,
1212                   char *orderby_fld,
1213                   char *branch_delim,
1214                   char *start_with,
1215                   int max_depth,
1216                   bool show_branch,
1217                   bool show_serial,
1218                   MemoryContext per_query_ctx,
1219                   AttInMetadata *attinmeta)
1220 {
1221         Tuplestorestate *tupstore = NULL;
1222         int                     ret;
1223         MemoryContext oldcontext;
1224
1225         int                     serial = 1;
1226
1227         /* Connect to SPI manager */
1228         if ((ret = SPI_connect()) < 0)
1229                 /* internal error */
1230                 elog(ERROR, "connectby: SPI_connect returned %d", ret);
1231
1232         /* switch to longer term context to create the tuple store */
1233         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1234
1235         /* initialize our tuplestore */
1236         tupstore = tuplestore_begin_heap(true, false, work_mem);
1237
1238         MemoryContextSwitchTo(oldcontext);
1239
1240         /* now go get the whole tree */
1241         tupstore = build_tuplestore_recursively(key_fld,
1242                                                                                         parent_key_fld,
1243                                                                                         relname,
1244                                                                                         orderby_fld,
1245                                                                                         branch_delim,
1246                                                                                         start_with,
1247                                                                                         start_with, /* current_branch */
1248                                                                                         0,      /* initial level is 0 */
1249                                                                                         &serial,        /* initial serial is 1 */
1250                                                                                         max_depth,
1251                                                                                         show_branch,
1252                                                                                         show_serial,
1253                                                                                         per_query_ctx,
1254                                                                                         attinmeta,
1255                                                                                         tupstore);
1256
1257         SPI_finish();
1258
1259         return tupstore;
1260 }
1261
1262 static Tuplestorestate *
1263 build_tuplestore_recursively(char *key_fld,
1264                                                          char *parent_key_fld,
1265                                                          char *relname,
1266                                                          char *orderby_fld,
1267                                                          char *branch_delim,
1268                                                          char *start_with,
1269                                                          char *branch,
1270                                                          int level,
1271                                                          int *serial,
1272                                                          int max_depth,
1273                                                          bool show_branch,
1274                                                          bool show_serial,
1275                                                          MemoryContext per_query_ctx,
1276                                                          AttInMetadata *attinmeta,
1277                                                          Tuplestorestate *tupstore)
1278 {
1279         TupleDesc       tupdesc = attinmeta->tupdesc;
1280         MemoryContext oldcontext;
1281         int                     ret;
1282         int                     proc;
1283         int                     serial_column;
1284         StringInfoData sql;
1285         char      **values;
1286         char       *current_key;
1287         char       *current_key_parent;
1288         char            current_level[INT32_STRLEN];
1289         char            serial_str[INT32_STRLEN];
1290         char       *current_branch;
1291         HeapTuple       tuple;
1292
1293         if (max_depth > 0 && level > max_depth)
1294                 return tupstore;
1295
1296         initStringInfo(&sql);
1297
1298         /* Build initial sql statement */
1299         if (!show_serial)
1300         {
1301                 appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s",
1302                                                  key_fld,
1303                                                  parent_key_fld,
1304                                                  relname,
1305                                                  parent_key_fld,
1306                                                  quote_literal_cstr(start_with),
1307                                                  key_fld, key_fld, parent_key_fld);
1308                 serial_column = 0;
1309         }
1310         else
1311         {
1312                 appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s ORDER BY %s",
1313                                                  key_fld,
1314                                                  parent_key_fld,
1315                                                  relname,
1316                                                  parent_key_fld,
1317                                                  quote_literal_cstr(start_with),
1318                                                  key_fld, key_fld, parent_key_fld,
1319                                                  orderby_fld);
1320                 serial_column = 1;
1321         }
1322
1323         if (show_branch)
1324                 values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
1325         else
1326                 values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
1327
1328         /* First time through, do a little setup */
1329         if (level == 0)
1330         {
1331                 /* root value is the one we initially start with */
1332                 values[0] = start_with;
1333
1334                 /* root value has no parent */
1335                 values[1] = NULL;
1336
1337                 /* root level is 0 */
1338                 sprintf(current_level, "%d", level);
1339                 values[2] = current_level;
1340
1341                 /* root branch is just starting root value */
1342                 if (show_branch)
1343                         values[3] = start_with;
1344
1345                 /* root starts the serial with 1 */
1346                 if (show_serial)
1347                 {
1348                         sprintf(serial_str, "%d", (*serial)++);
1349                         if (show_branch)
1350                                 values[4] = serial_str;
1351                         else
1352                                 values[3] = serial_str;
1353                 }
1354
1355                 /* construct the tuple */
1356                 tuple = BuildTupleFromCStrings(attinmeta, values);
1357
1358                 /* switch to long lived context while storing the tuple */
1359                 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1360
1361                 /* now store it */
1362                 tuplestore_puttuple(tupstore, tuple);
1363
1364                 /* now reset the context */
1365                 MemoryContextSwitchTo(oldcontext);
1366
1367                 /* increment level */
1368                 level++;
1369         }
1370
1371         /* Retrieve the desired rows */
1372         ret = SPI_execute(sql.data, true, 0);
1373         proc = SPI_processed;
1374
1375         /* Check for qualifying tuples */
1376         if ((ret == SPI_OK_SELECT) && (proc > 0))
1377         {
1378                 HeapTuple       spi_tuple;
1379                 SPITupleTable *tuptable = SPI_tuptable;
1380                 TupleDesc       spi_tupdesc = tuptable->tupdesc;
1381                 int                     i;
1382                 StringInfoData branchstr;
1383                 StringInfoData chk_branchstr;
1384                 StringInfoData chk_current_key;
1385
1386                 /* First time through, do a little more setup */
1387                 if (level == 0)
1388                 {
1389                         /*
1390                          * Check that return tupdesc is compatible with the one we got
1391                          * from the query, but only at level 0 -- no need to check more
1392                          * than once
1393                          */
1394
1395                         if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
1396                                 ereport(ERROR,
1397                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1398                                                  errmsg("invalid return type"),
1399                                                  errdetail("Return and SQL tuple descriptions are " \
1400                                                                    "incompatible.")));
1401                 }
1402
1403                 initStringInfo(&branchstr);
1404                 initStringInfo(&chk_branchstr);
1405                 initStringInfo(&chk_current_key);
1406
1407                 for (i = 0; i < proc; i++)
1408                 {
1409                         /* initialize branch for this pass */
1410                         appendStringInfo(&branchstr, "%s", branch);
1411                         appendStringInfo(&chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
1412
1413                         /* get the next sql result tuple */
1414                         spi_tuple = tuptable->vals[i];
1415
1416                         /* get the current key and parent */
1417                         current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
1418                         appendStringInfo(&chk_current_key, "%s%s%s", branch_delim, current_key, branch_delim);
1419                         current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
1420
1421                         /* get the current level */
1422                         sprintf(current_level, "%d", level);
1423
1424                         /* check to see if this key is also an ancestor */
1425                         if (strstr(chk_branchstr.data, chk_current_key.data))
1426                                 elog(ERROR, "infinite recursion detected");
1427
1428                         /* OK, extend the branch */
1429                         appendStringInfo(&branchstr, "%s%s", branch_delim, current_key);
1430                         current_branch = branchstr.data;
1431
1432                         /* build a tuple */
1433                         values[0] = pstrdup(current_key);
1434                         values[1] = current_key_parent;
1435                         values[2] = current_level;
1436                         if (show_branch)
1437                                 values[3] = current_branch;
1438                         if (show_serial)
1439                         {
1440                                 sprintf(serial_str, "%d", (*serial)++);
1441                                 if (show_branch)
1442                                         values[4] = serial_str;
1443                                 else
1444                                         values[3] = serial_str;
1445                         }
1446
1447                         tuple = BuildTupleFromCStrings(attinmeta, values);
1448
1449                         xpfree(current_key);
1450                         xpfree(current_key_parent);
1451
1452                         /* switch to long lived context while storing the tuple */
1453                         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1454
1455                         /* store the tuple for later use */
1456                         tuplestore_puttuple(tupstore, tuple);
1457
1458                         /* now reset the context */
1459                         MemoryContextSwitchTo(oldcontext);
1460
1461                         heap_freetuple(tuple);
1462
1463                         /* recurse using current_key_parent as the new start_with */
1464                         tupstore = build_tuplestore_recursively(key_fld,
1465                                                                                                         parent_key_fld,
1466                                                                                                         relname,
1467                                                                                                         orderby_fld,
1468                                                                                                         branch_delim,
1469                                                                                                         values[0],
1470                                                                                                         current_branch,
1471                                                                                                         level + 1,
1472                                                                                                         serial,
1473                                                                                                         max_depth,
1474                                                                                                         show_branch,
1475                                                                                                         show_serial,
1476                                                                                                         per_query_ctx,
1477                                                                                                         attinmeta,
1478                                                                                                         tupstore);
1479
1480                         /* reset branch for next pass */
1481                         resetStringInfo(&branchstr);
1482                         resetStringInfo(&chk_branchstr);
1483                         resetStringInfo(&chk_current_key);
1484                 }
1485
1486                 xpfree(branchstr.data);
1487                 xpfree(chk_branchstr.data);
1488                 xpfree(chk_current_key.data);
1489         }
1490
1491         return tupstore;
1492 }
1493
1494 /*
1495  * Check expected (query runtime) tupdesc suitable for Connectby
1496  */
1497 static void
1498 validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial)
1499 {
1500         int                     serial_column = 0;
1501
1502         if (show_serial)
1503                 serial_column = 1;
1504
1505         /* are there the correct number of columns */
1506         if (show_branch)
1507         {
1508                 if (tupdesc->natts != (CONNECTBY_NCOLS + serial_column))
1509                         ereport(ERROR,
1510                                         (errcode(ERRCODE_SYNTAX_ERROR),
1511                                          errmsg("invalid return type"),
1512                                          errdetail("Query-specified return tuple has " \
1513                                                            "wrong number of columns.")));
1514         }
1515         else
1516         {
1517                 if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH + serial_column)
1518                         ereport(ERROR,
1519                                         (errcode(ERRCODE_SYNTAX_ERROR),
1520                                          errmsg("invalid return type"),
1521                                          errdetail("Query-specified return tuple has " \
1522                                                            "wrong number of columns.")));
1523         }
1524
1525         /* check that the types of the first two columns match */
1526         if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
1527                 ereport(ERROR,
1528                                 (errcode(ERRCODE_SYNTAX_ERROR),
1529                                  errmsg("invalid return type"),
1530                                  errdetail("First two columns must be the same type.")));
1531
1532         /* check that the type of the third column is INT4 */
1533         if (tupdesc->attrs[2]->atttypid != INT4OID)
1534                 ereport(ERROR,
1535                                 (errcode(ERRCODE_SYNTAX_ERROR),
1536                                  errmsg("invalid return type"),
1537                                  errdetail("Third column must be type %s.",
1538                                                    format_type_be(INT4OID))));
1539
1540         /* check that the type of the fourth column is TEXT if applicable */
1541         if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
1542                 ereport(ERROR,
1543                                 (errcode(ERRCODE_SYNTAX_ERROR),
1544                                  errmsg("invalid return type"),
1545                                  errdetail("Fourth column must be type %s.",
1546                                                    format_type_be(TEXTOID))));
1547
1548         /* check that the type of the fifth column is INT4 */
1549         if (show_branch && show_serial && tupdesc->attrs[4]->atttypid != INT4OID)
1550                 elog(ERROR, "query-specified return tuple not valid for Connectby: "
1551                          "fifth column must be type %s", format_type_be(INT4OID));
1552
1553         /* check that the type of the fifth column is INT4 */
1554         if (!show_branch && show_serial && tupdesc->attrs[3]->atttypid != INT4OID)
1555                 elog(ERROR, "query-specified return tuple not valid for Connectby: "
1556                          "fourth column must be type %s", format_type_be(INT4OID));
1557
1558         /* OK, the tupdesc is valid for our purposes */
1559 }
1560
1561 /*
1562  * Check if spi sql tupdesc and return tupdesc are compatible
1563  */
1564 static bool
1565 compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1566 {
1567         Oid                     ret_atttypid;
1568         Oid                     sql_atttypid;
1569
1570         /* check the key_fld types match */
1571         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1572         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1573         if (ret_atttypid != sql_atttypid)
1574                 ereport(ERROR,
1575                                 (errcode(ERRCODE_SYNTAX_ERROR),
1576                                  errmsg("invalid return type"),
1577                                  errdetail("SQL key field datatype does " \
1578                                                    "not match return key field datatype.")));
1579
1580         /* check the parent_key_fld types match */
1581         ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
1582         sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
1583         if (ret_atttypid != sql_atttypid)
1584                 ereport(ERROR,
1585                                 (errcode(ERRCODE_SYNTAX_ERROR),
1586                                  errmsg("invalid return type"),
1587                                  errdetail("SQL parent key field datatype does " \
1588                                                    "not match return parent key field datatype.")));
1589
1590         /* OK, the two tupdescs are compatible for our purposes */
1591         return true;
1592 }
1593
1594 /*
1595  * Check if two tupdescs match in type of attributes
1596  */
1597 static bool
1598 compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1599 {
1600         int                     i;
1601         Form_pg_attribute ret_attr;
1602         Oid                     ret_atttypid;
1603         Form_pg_attribute sql_attr;
1604         Oid                     sql_atttypid;
1605
1606         /* check the rowid types match */
1607         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1608         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1609         if (ret_atttypid != sql_atttypid)
1610                 ereport(ERROR,
1611                                 (errcode(ERRCODE_SYNTAX_ERROR),
1612                                  errmsg("invalid return type"),
1613                                  errdetail("SQL rowid datatype does not match " \
1614                                                    "return rowid datatype.")));
1615
1616         /*
1617          * - attribute [1] of the sql tuple is the category; no need to check it -
1618          * attribute [2] of the sql tuple should match attributes [1] to [natts]
1619          * of the return tuple
1620          */
1621         sql_attr = sql_tupdesc->attrs[2];
1622         for (i = 1; i < ret_tupdesc->natts; i++)
1623         {
1624                 ret_attr = ret_tupdesc->attrs[i];
1625
1626                 if (ret_attr->atttypid != sql_attr->atttypid)
1627                         return false;
1628         }
1629
1630         /* OK, the two tupdescs are compatible for our purposes */
1631         return true;
1632 }
1633
1634 /*
1635  * Return a properly quoted literal value.
1636  * Uses quote_literal in quote.c
1637  */
1638 static char *
1639 quote_literal_cstr(char *rawstr)
1640 {
1641         text       *rawstr_text;
1642         text       *result_text;
1643         char       *result;
1644
1645         rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
1646         result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
1647         result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
1648
1649         return result;
1650 }