]> granicus.if.org Git - postgresql/blob - contrib/tablefunc/tablefunc.c
b63bd21cd7d5cc5e6b4f97cab6dbb5f2049532d9
[postgresql] / contrib / tablefunc / tablefunc.c
1 /*
2  * tablefunc
3  *
4  * Sample to demonstrate C functions which return setof scalar
5  * and setof composite.
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Nabil Sayegh <postgresql@e-trolley.de>
9  *
10  * Copyright (c) 2002-2006, PostgreSQL Global Development Group
11  *
12  * Permission to use, copy, modify, and distribute this software and its
13  * documentation for any purpose, without fee, and without a written agreement
14  * is hereby granted, provided that the above copyright notice and this
15  * paragraph and the following two paragraphs appear in all copies.
16  *
17  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
18  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
19  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
20  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
21  * POSSIBILITY OF SUCH DAMAGE.
22  *
23  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
24  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
25  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
26  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
27  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
28  *
29  */
30 #include "postgres.h"
31
32 #include <math.h>
33
34 #include "fmgr.h"
35 #include "funcapi.h"
36 #include "executor/spi.h"
37 #include "lib/stringinfo.h"
38 #include "miscadmin.h"
39 #include "utils/builtins.h"
40 #include "utils/guc.h"
41 #include "utils/lsyscache.h"
42
43 #include "tablefunc.h"
44
45 PG_MODULE_MAGIC;
46
47 static int      load_categories_hash(char *cats_sql, MemoryContext per_query_ctx);
48 static Tuplestorestate *get_crosstab_tuplestore(char *sql,
49                                                 int num_categories,
50                                                 TupleDesc tupdesc,
51                                                 MemoryContext per_query_ctx);
52 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial);
53 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
54 static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
55 static void get_normal_pair(float8 *x1, float8 *x2);
56 static Tuplestorestate *connectby(char *relname,
57                   char *key_fld,
58                   char *parent_key_fld,
59                   char *orderby_fld,
60                   char *branch_delim,
61                   char *start_with,
62                   int max_depth,
63                   bool show_branch,
64                   bool show_serial,
65                   MemoryContext per_query_ctx,
66                   AttInMetadata *attinmeta);
67 static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
68                                                          char *parent_key_fld,
69                                                          char *relname,
70                                                          char *orderby_fld,
71                                                          char *branch_delim,
72                                                          char *start_with,
73                                                          char *branch,
74                                                          int level,
75                                                          int *serial,
76                                                          int max_depth,
77                                                          bool show_branch,
78                                                          bool show_serial,
79                                                          MemoryContext per_query_ctx,
80                                                          AttInMetadata *attinmeta,
81                                                          Tuplestorestate *tupstore);
82 static char *quote_literal_cstr(char *rawstr);
83
84 typedef struct
85 {
86         float8          mean;                   /* mean of the distribution */
87         float8          stddev;                 /* stddev of the distribution */
88         float8          carry_val;              /* hold second generated value */
89         bool            use_carry;              /* use second generated value */
90 }       normal_rand_fctx;
91
92 typedef struct
93 {
94         SPITupleTable *spi_tuptable;    /* sql results from user query */
95         char       *lastrowid;          /* rowid of the last tuple sent */
96 }       crosstab_fctx;
97
98 #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
99 #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
100 #define xpfree(var_) \
101         do { \
102                 if (var_ != NULL) \
103                 { \
104                         pfree(var_); \
105                         var_ = NULL; \
106                 } \
107         } while (0)
108
109 /* sign, 10 digits, '\0' */
110 #define INT32_STRLEN    12
111
112 /* hash table support */
113 static HTAB *crosstab_HashTable;
114
115 /* The information we cache about loaded procedures */
116 typedef struct crosstab_cat_desc
117 {
118         char       *catname;
119         int                     attidx;                 /* zero based */
120 }       crosstab_cat_desc;
121
122 #define MAX_CATNAME_LEN                 NAMEDATALEN
123 #define INIT_CATS                               64
124
125 #define crosstab_HashTableLookup(CATNAME, CATDESC) \
126 do { \
127         crosstab_HashEnt *hentry; char key[MAX_CATNAME_LEN]; \
128         \
129         MemSet(key, 0, MAX_CATNAME_LEN); \
130         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATNAME); \
131         hentry = (crosstab_HashEnt*) hash_search(crosstab_HashTable, \
132                                                                                  key, HASH_FIND, NULL); \
133         if (hentry) \
134                 CATDESC = hentry->catdesc; \
135         else \
136                 CATDESC = NULL; \
137 } while(0)
138
139 #define crosstab_HashTableInsert(CATDESC) \
140 do { \
141         crosstab_HashEnt *hentry; bool found; char key[MAX_CATNAME_LEN]; \
142         \
143         MemSet(key, 0, MAX_CATNAME_LEN); \
144         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATDESC->catname); \
145         hentry = (crosstab_HashEnt*) hash_search(crosstab_HashTable, \
146                                                                                  key, HASH_ENTER, &found); \
147         if (found) \
148                 ereport(ERROR, \
149                                 (errcode(ERRCODE_DUPLICATE_OBJECT), \
150                                  errmsg("duplicate category name"))); \
151         hentry->catdesc = CATDESC; \
152 } while(0)
153
154 /* hash table */
155 typedef struct crosstab_hashent
156 {
157         char            internal_catname[MAX_CATNAME_LEN];
158         crosstab_cat_desc *catdesc;
159 }       crosstab_HashEnt;
160
161 /*
162  * normal_rand - return requested number of random values
163  * with a Gaussian (Normal) distribution.
164  *
165  * inputs are int numvals, float8 mean, and float8 stddev
166  * returns setof float8
167  */
168 PG_FUNCTION_INFO_V1(normal_rand);
169 Datum
170 normal_rand(PG_FUNCTION_ARGS)
171 {
172         FuncCallContext *funcctx;
173         int                     call_cntr;
174         int                     max_calls;
175         normal_rand_fctx *fctx;
176         float8          mean;
177         float8          stddev;
178         float8          carry_val;
179         bool            use_carry;
180         MemoryContext oldcontext;
181
182         /* stuff done only on the first call of the function */
183         if (SRF_IS_FIRSTCALL())
184         {
185                 /* create a function context for cross-call persistence */
186                 funcctx = SRF_FIRSTCALL_INIT();
187
188                 /*
189                  * switch to memory context appropriate for multiple function calls
190                  */
191                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
192
193                 /* total number of tuples to be returned */
194                 funcctx->max_calls = PG_GETARG_UINT32(0);
195
196                 /* allocate memory for user context */
197                 fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
198
199                 /*
200                  * Use fctx to keep track of upper and lower bounds from call to call.
201                  * It will also be used to carry over the spare value we get from the
202                  * Box-Muller algorithm so that we only actually calculate a new value
203                  * every other call.
204                  */
205                 fctx->mean = PG_GETARG_FLOAT8(1);
206                 fctx->stddev = PG_GETARG_FLOAT8(2);
207                 fctx->carry_val = 0;
208                 fctx->use_carry = false;
209
210                 funcctx->user_fctx = fctx;
211
212                 MemoryContextSwitchTo(oldcontext);
213         }
214
215         /* stuff done on every call of the function */
216         funcctx = SRF_PERCALL_SETUP();
217
218         call_cntr = funcctx->call_cntr;
219         max_calls = funcctx->max_calls;
220         fctx = funcctx->user_fctx;
221         mean = fctx->mean;
222         stddev = fctx->stddev;
223         carry_val = fctx->carry_val;
224         use_carry = fctx->use_carry;
225
226         if (call_cntr < max_calls)      /* do when there is more left to send */
227         {
228                 float8          result;
229
230                 if (use_carry)
231                 {
232                         /*
233                          * reset use_carry and use second value obtained on last pass
234                          */
235                         fctx->use_carry = false;
236                         result = carry_val;
237                 }
238                 else
239                 {
240                         float8          normval_1;
241                         float8          normval_2;
242
243                         /* Get the next two normal values */
244                         get_normal_pair(&normval_1, &normval_2);
245
246                         /* use the first */
247                         result = mean + (stddev * normval_1);
248
249                         /* and save the second */
250                         fctx->carry_val = mean + (stddev * normval_2);
251                         fctx->use_carry = true;
252                 }
253
254                 /* send the result */
255                 SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
256         }
257         else
258                 /* do when there is no more left */
259                 SRF_RETURN_DONE(funcctx);
260 }
261
262 /*
263  * get_normal_pair()
264  * Assigns normally distributed (Gaussian) values to a pair of provided
265  * parameters, with mean 0, standard deviation 1.
266  *
267  * This routine implements Algorithm P (Polar method for normal deviates)
268  * from Knuth's _The_Art_of_Computer_Programming_, Volume 2, 3rd ed., pages
269  * 122-126. Knuth cites his source as "The polar method", G. E. P. Box, M. E.
270  * Muller, and G. Marsaglia, _Annals_Math,_Stat._ 29 (1958), 610-611.
271  *
272  */
273 static void
274 get_normal_pair(float8 *x1, float8 *x2)
275 {
276         float8          u1,
277                                 u2,
278                                 v1,
279                                 v2,
280                                 s;
281
282         do
283         {
284                 u1 = (float8) random() / (float8) MAX_RANDOM_VALUE;
285                 u2 = (float8) random() / (float8) MAX_RANDOM_VALUE;
286
287                 v1 = (2.0 * u1) - 1.0;
288                 v2 = (2.0 * u2) - 1.0;
289
290                 s = v1 * v1 + v2 * v2;
291         } while (s >= 1.0);
292
293         if (s == 0)
294         {
295                 *x1 = 0;
296                 *x2 = 0;
297         }
298         else
299         {
300                 s = sqrt((-2.0 * log(s)) / s);
301                 *x1 = v1 * s;
302                 *x2 = v2 * s;
303         }
304 }
305
306 /*
307  * crosstab - create a crosstab of rowids and values columns from a
308  * SQL statement returning one rowid column, one category column,
309  * and one value column.
310  *
311  * e.g. given sql which produces:
312  *
313  *                      rowid   cat             value
314  *                      ------+-------+-------
315  *                      row1    cat1    val1
316  *                      row1    cat2    val2
317  *                      row1    cat3    val3
318  *                      row1    cat4    val4
319  *                      row2    cat1    val5
320  *                      row2    cat2    val6
321  *                      row2    cat3    val7
322  *                      row2    cat4    val8
323  *
324  * crosstab returns:
325  *                                      <===== values columns =====>
326  *                      rowid   cat1    cat2    cat3    cat4
327  *                      ------+-------+-------+-------+-------
328  *                      row1    val1    val2    val3    val4
329  *                      row2    val5    val6    val7    val8
330  *
331  * NOTES:
332  * 1. SQL result must be ordered by 1,2.
333  * 2. The number of values columns depends on the tuple description
334  *        of the function's declared return type.  The return type's columns
335  *        must match the datatypes of the SQL query's result.  The datatype
336  *        of the category column can be anything, however.
337  * 3. Missing values (i.e. not enough adjacent rows of same rowid to
338  *        fill the number of result values columns) are filled in with nulls.
339  * 4. Extra values (i.e. too many adjacent rows of same rowid to fill
340  *        the number of result values columns) are skipped.
341  * 5. Rows with all nulls in the values columns are skipped.
342  */
343 PG_FUNCTION_INFO_V1(crosstab);
344 Datum
345 crosstab(PG_FUNCTION_ARGS)
346 {
347         FuncCallContext *funcctx;
348         TupleDesc       ret_tupdesc;
349         int                     call_cntr;
350         int                     max_calls;
351         AttInMetadata *attinmeta;
352         SPITupleTable *spi_tuptable = NULL;
353         TupleDesc       spi_tupdesc;
354         char       *lastrowid = NULL;
355         crosstab_fctx *fctx;
356         int                     i;
357         int                     num_categories;
358         MemoryContext oldcontext;
359
360         /* stuff done only on the first call of the function */
361         if (SRF_IS_FIRSTCALL())
362         {
363                 char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
364                 TupleDesc       tupdesc;
365                 int                     ret;
366                 int                     proc;
367
368                 /* create a function context for cross-call persistence */
369                 funcctx = SRF_FIRSTCALL_INIT();
370
371                 /*
372                  * switch to memory context appropriate for multiple function calls
373                  */
374                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
375
376                 /* Connect to SPI manager */
377                 if ((ret = SPI_connect()) < 0)
378                         /* internal error */
379                         elog(ERROR, "crosstab: SPI_connect returned %d", ret);
380
381                 /* Retrieve the desired rows */
382                 ret = SPI_execute(sql, true, 0);
383                 proc = SPI_processed;
384
385                 /* Check for qualifying tuples */
386                 if ((ret == SPI_OK_SELECT) && (proc > 0))
387                 {
388                         spi_tuptable = SPI_tuptable;
389                         spi_tupdesc = spi_tuptable->tupdesc;
390
391                         /*----------
392                          * The provided SQL query must always return three columns.
393                          *
394                          * 1. rowname
395                          *      the label or identifier for each row in the final result
396                          * 2. category
397                          *      the label or identifier for each column in the final result
398                          * 3. values
399                          *      the value for each column in the final result
400                          *----------
401                          */
402                         if (spi_tupdesc->natts != 3)
403                                 ereport(ERROR,
404                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
405                                                  errmsg("invalid source data SQL statement"),
406                                                  errdetail("The provided SQL must return 3 "
407                                                                    "columns: rowid, category, and values.")));
408                 }
409                 else
410                 {
411                         /* no qualifying tuples */
412                         SPI_finish();
413                         SRF_RETURN_DONE(funcctx);
414                 }
415
416                 /* SPI switches context on us, so reset it */
417                 MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
418
419                 /* get a tuple descriptor for our result type */
420                 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
421                 {
422                         case TYPEFUNC_COMPOSITE:
423                                 /* success */
424                                 break;
425                         case TYPEFUNC_RECORD:
426                                 /* failed to determine actual type of RECORD */
427                                 ereport(ERROR,
428                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
429                                                  errmsg("function returning record called in context "
430                                                                 "that cannot accept type record")));
431                                 break;
432                         default:
433                                 /* result type isn't composite */
434                                 elog(ERROR, "return type must be a row type");
435                                 break;
436                 }
437
438                 /* make sure we have a persistent copy of the tupdesc */
439                 tupdesc = CreateTupleDescCopy(tupdesc);
440
441                 /*
442                  * Check that return tupdesc is compatible with the data we got from
443                  * SPI, at least based on number and type of attributes
444                  */
445                 if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
446                         ereport(ERROR,
447                                         (errcode(ERRCODE_SYNTAX_ERROR),
448                                          errmsg("return and sql tuple descriptions are " \
449                                                         "incompatible")));
450
451                 /*
452                  * Generate attribute metadata needed later to produce tuples from raw
453                  * C strings
454                  */
455                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
456                 funcctx->attinmeta = attinmeta;
457
458                 /* allocate memory for user context */
459                 fctx = (crosstab_fctx *) palloc(sizeof(crosstab_fctx));
460
461                 /*
462                  * Save spi data for use across calls
463                  */
464                 fctx->spi_tuptable = spi_tuptable;
465                 fctx->lastrowid = NULL;
466                 funcctx->user_fctx = fctx;
467
468                 /* total number of tuples to be returned */
469                 funcctx->max_calls = proc;
470
471                 MemoryContextSwitchTo(oldcontext);
472         }
473
474         /* stuff done on every call of the function */
475         funcctx = SRF_PERCALL_SETUP();
476
477         /*
478          * initialize per-call variables
479          */
480         call_cntr = funcctx->call_cntr;
481         max_calls = funcctx->max_calls;
482
483         /* user context info */
484         fctx = (crosstab_fctx *) funcctx->user_fctx;
485         lastrowid = fctx->lastrowid;
486         spi_tuptable = fctx->spi_tuptable;
487
488         /* the sql tuple */
489         spi_tupdesc = spi_tuptable->tupdesc;
490
491         /* attribute return type and return tuple description */
492         attinmeta = funcctx->attinmeta;
493         ret_tupdesc = attinmeta->tupdesc;
494
495         /* the return tuple always must have 1 rowid + num_categories columns */
496         num_categories = ret_tupdesc->natts - 1;
497
498         if (call_cntr < max_calls)      /* do when there is more left to send */
499         {
500                 HeapTuple       tuple;
501                 Datum           result;
502                 char      **values;
503                 bool            allnulls = true;
504
505                 while (true)
506                 {
507                         /* allocate space */
508                         values = (char **) palloc((1 + num_categories) * sizeof(char *));
509
510                         /* and make sure it's clear */
511                         memset(values, '\0', (1 + num_categories) * sizeof(char *));
512
513                         /*
514                          * now loop through the sql results and assign each value in
515                          * sequence to the next category
516                          */
517                         for (i = 0; i < num_categories; i++)
518                         {
519                                 HeapTuple       spi_tuple;
520                                 char       *rowid = NULL;
521
522                                 /* see if we've gone too far already */
523                                 if (call_cntr >= max_calls)
524                                         break;
525
526                                 /* get the next sql result tuple */
527                                 spi_tuple = spi_tuptable->vals[call_cntr];
528
529                                 /* get the rowid from the current sql result tuple */
530                                 rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
531
532                                 /*
533                                  * If this is the first pass through the values for this rowid
534                                  * set it, otherwise make sure it hasn't changed on us. Also
535                                  * check to see if the rowid is the same as that of the last
536                                  * tuple sent -- if so, skip this tuple entirely
537                                  */
538                                 if (i == 0)
539                                         values[0] = pstrdup(rowid);
540
541                                 if ((rowid != NULL) && (strcmp(rowid, values[0]) == 0))
542                                 {
543                                         if ((lastrowid != NULL) && (strcmp(rowid, lastrowid) == 0))
544                                                 break;
545                                         else if (allnulls == true)
546                                                 allnulls = false;
547
548                                         /*
549                                          * Get the next category item value, which is alway
550                                          * attribute number three.
551                                          *
552                                          * Be careful to sssign the value to the array index based
553                                          * on which category we are presently processing.
554                                          */
555                                         values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
556
557                                         /*
558                                          * increment the counter since we consume a row for each
559                                          * category, but not for last pass because the API will do
560                                          * that for us
561                                          */
562                                         if (i < (num_categories - 1))
563                                                 call_cntr = ++funcctx->call_cntr;
564                                 }
565                                 else
566                                 {
567                                         /*
568                                          * We'll fill in NULLs for the missing values, but we need
569                                          * to decrement the counter since this sql result row
570                                          * doesn't belong to the current output tuple.
571                                          */
572                                         call_cntr = --funcctx->call_cntr;
573                                         break;
574                                 }
575
576                                 if (rowid != NULL)
577                                         xpfree(rowid);
578                         }
579
580                         xpfree(fctx->lastrowid);
581
582                         if (values[0] != NULL)
583                         {
584                                 /*
585                                  * switch to memory context appropriate for multiple function
586                                  * calls
587                                  */
588                                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
589
590                                 lastrowid = fctx->lastrowid = pstrdup(values[0]);
591                                 MemoryContextSwitchTo(oldcontext);
592                         }
593
594                         if (!allnulls)
595                         {
596                                 /* build the tuple */
597                                 tuple = BuildTupleFromCStrings(attinmeta, values);
598
599                                 /* make the tuple into a datum */
600                                 result = HeapTupleGetDatum(tuple);
601
602                                 /* Clean up */
603                                 for (i = 0; i < num_categories + 1; i++)
604                                         if (values[i] != NULL)
605                                                 xpfree(values[i]);
606                                 xpfree(values);
607
608                                 SRF_RETURN_NEXT(funcctx, result);
609                         }
610                         else
611                         {
612                                 /*
613                                  * Skipping this tuple entirely, but we need to advance the
614                                  * counter like the API would if we had returned one.
615                                  */
616                                 call_cntr = ++funcctx->call_cntr;
617
618                                 /* we'll start over at the top */
619                                 xpfree(values);
620
621                                 /* see if we've gone too far already */
622                                 if (call_cntr >= max_calls)
623                                 {
624                                         /* release SPI related resources */
625                                         SPI_finish();
626                                         SRF_RETURN_DONE(funcctx);
627                                 }
628                         }
629                 }
630         }
631         else
632                 /* do when there is no more left */
633         {
634                 /* release SPI related resources */
635                 SPI_finish();
636                 SRF_RETURN_DONE(funcctx);
637         }
638 }
639
640 /*
641  * crosstab_hash - reimplement crosstab as materialized function and
642  * properly deal with missing values (i.e. don't pack remaining
643  * values to the left)
644  *
645  * crosstab - create a crosstab of rowids and values columns from a
646  * SQL statement returning one rowid column, one category column,
647  * and one value column.
648  *
649  * e.g. given sql which produces:
650  *
651  *                      rowid   cat             value
652  *                      ------+-------+-------
653  *                      row1    cat1    val1
654  *                      row1    cat2    val2
655  *                      row1    cat4    val4
656  *                      row2    cat1    val5
657  *                      row2    cat2    val6
658  *                      row2    cat3    val7
659  *                      row2    cat4    val8
660  *
661  * crosstab returns:
662  *                                      <===== values columns =====>
663  *                      rowid   cat1    cat2    cat3    cat4
664  *                      ------+-------+-------+-------+-------
665  *                      row1    val1    val2    null    val4
666  *                      row2    val5    val6    val7    val8
667  *
668  * NOTES:
669  * 1. SQL result must be ordered by 1.
670  * 2. The number of values columns depends on the tuple description
671  *        of the function's declared return type.
672  * 3. Missing values (i.e. missing category) are filled in with nulls.
673  * 4. Extra values (i.e. not in category results) are skipped.
674  */
675 PG_FUNCTION_INFO_V1(crosstab_hash);
676 Datum
677 crosstab_hash(PG_FUNCTION_ARGS)
678 {
679         char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
680         char       *cats_sql = GET_STR(PG_GETARG_TEXT_P(1));
681         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
682         TupleDesc       tupdesc;
683         MemoryContext per_query_ctx;
684         MemoryContext oldcontext;
685         int                     num_categories;
686
687         /* check to see if caller supports us returning a tuplestore */
688         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
689                 ereport(ERROR,
690                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
691                                  errmsg("set-valued function called in context that cannot accept a set")));
692         if (!(rsinfo->allowedModes & SFRM_Materialize))
693                 ereport(ERROR,
694                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
695                                  errmsg("materialize mode required, but it is not " \
696                                                 "allowed in this context")));
697
698         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
699         oldcontext = MemoryContextSwitchTo(per_query_ctx);
700
701         /* get the requested return tuple description */
702         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
703
704         /*
705          * Check to make sure we have a reasonable tuple descriptor
706          *
707          * Note we will attempt to coerce the values into whatever the return
708          * attribute type is and depend on the "in" function to complain if
709          * needed.
710          */
711         if (tupdesc->natts < 2)
712                 ereport(ERROR,
713                                 (errcode(ERRCODE_SYNTAX_ERROR),
714                                  errmsg("query-specified return tuple and " \
715                                                 "crosstab function are not compatible")));
716
717         /* load up the categories hash table */
718         num_categories = load_categories_hash(cats_sql, per_query_ctx);
719
720         /* let the caller know we're sending back a tuplestore */
721         rsinfo->returnMode = SFRM_Materialize;
722
723         /* now go build it */
724         rsinfo->setResult = get_crosstab_tuplestore(sql,
725                                                                                                 num_categories,
726                                                                                                 tupdesc,
727                                                                                                 per_query_ctx);
728
729         /*
730          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
731          * tuples are in our tuplestore and passed back through rsinfo->setResult.
732          * rsinfo->setDesc is set to the tuple description that we actually used
733          * to build our tuples with, so the caller can verify we did what it was
734          * expecting.
735          */
736         rsinfo->setDesc = tupdesc;
737         MemoryContextSwitchTo(oldcontext);
738
739         return (Datum) 0;
740 }
741
742 /*
743  * load up the categories hash table
744  */
745 static int
746 load_categories_hash(char *cats_sql, MemoryContext per_query_ctx)
747 {
748         HASHCTL         ctl;
749         int                     ret;
750         int                     proc;
751         MemoryContext SPIcontext;
752         int                     num_categories = 0;
753
754         /* initialize the category hash table */
755         ctl.keysize = MAX_CATNAME_LEN;
756         ctl.entrysize = sizeof(crosstab_HashEnt);
757
758         /*
759          * use INIT_CATS, defined above as a guess of how many hash table entries
760          * to create, initially
761          */
762         crosstab_HashTable = hash_create("crosstab hash", INIT_CATS, &ctl, HASH_ELEM);
763
764         /* Connect to SPI manager */
765         if ((ret = SPI_connect()) < 0)
766                 /* internal error */
767                 elog(ERROR, "load_categories_hash: SPI_connect returned %d", ret);
768
769         /* Retrieve the category name rows */
770         ret = SPI_execute(cats_sql, true, 0);
771         num_categories = proc = SPI_processed;
772
773         /* Check for qualifying tuples */
774         if ((ret == SPI_OK_SELECT) && (proc > 0))
775         {
776                 SPITupleTable *spi_tuptable = SPI_tuptable;
777                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
778                 int                     i;
779
780                 /*
781                  * The provided categories SQL query must always return one column:
782                  * category - the label or identifier for each column
783                  */
784                 if (spi_tupdesc->natts != 1)
785                         ereport(ERROR,
786                                         (errcode(ERRCODE_SYNTAX_ERROR),
787                                          errmsg("provided \"categories\" SQL must " \
788                                                         "return 1 column of at least one row")));
789
790                 for (i = 0; i < proc; i++)
791                 {
792                         crosstab_cat_desc *catdesc;
793                         char       *catname;
794                         HeapTuple       spi_tuple;
795
796                         /* get the next sql result tuple */
797                         spi_tuple = spi_tuptable->vals[i];
798
799                         /* get the category from the current sql result tuple */
800                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
801
802                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
803
804                         catdesc = (crosstab_cat_desc *) palloc(sizeof(crosstab_cat_desc));
805                         catdesc->catname = catname;
806                         catdesc->attidx = i;
807
808                         /* Add the proc description block to the hashtable */
809                         crosstab_HashTableInsert(catdesc);
810
811                         MemoryContextSwitchTo(SPIcontext);
812                 }
813         }
814
815         if (SPI_finish() != SPI_OK_FINISH)
816                 /* internal error */
817                 elog(ERROR, "load_categories_hash: SPI_finish() failed");
818
819         return num_categories;
820 }
821
822 /*
823  * create and populate the crosstab tuplestore using the provided source query
824  */
825 static Tuplestorestate *
826 get_crosstab_tuplestore(char *sql,
827                                                 int num_categories,
828                                                 TupleDesc tupdesc,
829                                                 MemoryContext per_query_ctx)
830 {
831         Tuplestorestate *tupstore;
832         AttInMetadata *attinmeta = TupleDescGetAttInMetadata(tupdesc);
833         char      **values;
834         HeapTuple       tuple;
835         int                     ret;
836         int                     proc;
837         MemoryContext SPIcontext;
838
839         /* initialize our tuplestore */
840         tupstore = tuplestore_begin_heap(true, false, work_mem);
841
842         /* Connect to SPI manager */
843         if ((ret = SPI_connect()) < 0)
844                 /* internal error */
845                 elog(ERROR, "get_crosstab_tuplestore: SPI_connect returned %d", ret);
846
847         /* Now retrieve the crosstab source rows */
848         ret = SPI_execute(sql, true, 0);
849         proc = SPI_processed;
850
851         /* Check for qualifying tuples */
852         if ((ret == SPI_OK_SELECT) && (proc > 0))
853         {
854                 SPITupleTable *spi_tuptable = SPI_tuptable;
855                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
856                 int                     ncols = spi_tupdesc->natts;
857                 char       *rowid;
858                 char       *lastrowid = NULL;
859                 int                     i,
860                                         j;
861                 int                     result_ncols;
862
863                 if (num_categories == 0)
864                 {
865                         /* no qualifying category tuples */
866                         ereport(ERROR,
867                                         (errcode(ERRCODE_SYNTAX_ERROR),
868                                          errmsg("provided \"categories\" SQL must " \
869                                                         "return 1 column of at least one row")));
870                 }
871
872                 /*
873                  * The provided SQL query must always return at least three columns:
874                  *
875                  * 1. rowname   the label for each row - column 1 in the final result
876                  * 2. category  the label for each value-column in the final result 3.
877                  * value         the values used to populate the value-columns
878                  *
879                  * If there are more than three columns, the last two are taken as
880                  * "category" and "values". The first column is taken as "rowname".
881                  * Additional columns (2 thru N-2) are assumed the same for the same
882                  * "rowname", and are copied into the result tuple from the first time
883                  * we encounter a particular rowname.
884                  */
885                 if (ncols < 3)
886                         ereport(ERROR,
887                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
888                                          errmsg("invalid source data SQL statement"),
889                                          errdetail("The provided SQL must return 3 " \
890                                                            " columns; rowid, category, and values.")));
891
892                 result_ncols = (ncols - 2) + num_categories;
893
894                 /* Recheck to make sure we tuple descriptor still looks reasonable */
895                 if (tupdesc->natts != result_ncols)
896                         ereport(ERROR,
897                                         (errcode(ERRCODE_SYNTAX_ERROR),
898                                          errmsg("invalid return type"),
899                                          errdetail("Query-specified return " \
900                                                            "tuple has %d columns but crosstab " \
901                                                            "returns %d.", tupdesc->natts, result_ncols)));
902
903                 /* allocate space */
904                 values = (char **) palloc(result_ncols * sizeof(char *));
905
906                 /* and make sure it's clear */
907                 memset(values, '\0', result_ncols * sizeof(char *));
908
909                 for (i = 0; i < proc; i++)
910                 {
911                         HeapTuple       spi_tuple;
912                         crosstab_cat_desc *catdesc;
913                         char       *catname;
914
915                         /* get the next sql result tuple */
916                         spi_tuple = spi_tuptable->vals[i];
917
918                         /* get the rowid from the current sql result tuple */
919                         rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
920
921                         /* if rowid is null, skip this tuple entirely */
922                         if (rowid == NULL)
923                                 continue;
924
925                         /*
926                          * if we're on a new output row, grab the column values up to
927                          * column N-2 now
928                          */
929                         if ((lastrowid == NULL) || (strcmp(rowid, lastrowid) != 0))
930                         {
931                                 /*
932                                  * a new row means we need to flush the old one first, unless
933                                  * we're on the very first row
934                                  */
935                                 if (lastrowid != NULL)
936                                 {
937                                         /* rowid changed, flush the previous output row */
938                                         tuple = BuildTupleFromCStrings(attinmeta, values);
939
940                                         /* switch to appropriate context while storing the tuple */
941                                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
942                                         tuplestore_puttuple(tupstore, tuple);
943                                         MemoryContextSwitchTo(SPIcontext);
944
945                                         for (j = 0; j < result_ncols; j++)
946                                                 xpfree(values[j]);
947                                 }
948
949                                 values[0] = rowid;
950                                 for (j = 1; j < ncols - 2; j++)
951                                         values[j] = SPI_getvalue(spi_tuple, spi_tupdesc, j + 1);
952                         }
953
954                         /* look up the category and fill in the appropriate column */
955                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, ncols - 1);
956
957                         if (catname != NULL)
958                         {
959                                 crosstab_HashTableLookup(catname, catdesc);
960
961                                 if (catdesc)
962                                         values[catdesc->attidx + ncols - 2] =
963                                                 SPI_getvalue(spi_tuple, spi_tupdesc, ncols);
964                         }
965
966                         xpfree(lastrowid);
967                         lastrowid = pstrdup(rowid);
968                 }
969
970                 /* flush the last output row */
971                 tuple = BuildTupleFromCStrings(attinmeta, values);
972
973                 /* switch to appropriate context while storing the tuple */
974                 SPIcontext = MemoryContextSwitchTo(per_query_ctx);
975                 tuplestore_puttuple(tupstore, tuple);
976                 MemoryContextSwitchTo(SPIcontext);
977         }
978
979         if (SPI_finish() != SPI_OK_FINISH)
980                 /* internal error */
981                 elog(ERROR, "get_crosstab_tuplestore: SPI_finish() failed");
982
983         tuplestore_donestoring(tupstore);
984
985         return tupstore;
986 }
987
988 /*
989  * connectby_text - produce a result set from a hierarchical (parent/child)
990  * table.
991  *
992  * e.g. given table foo:
993  *
994  *                      keyid   parent_keyid pos
995  *                      ------+------------+--
996  *                      row1    NULL             0
997  *                      row2    row1             0
998  *                      row3    row1             0
999  *                      row4    row2             1
1000  *                      row5    row2             0
1001  *                      row6    row4             0
1002  *                      row7    row3             0
1003  *                      row8    row6             0
1004  *                      row9    row5             0
1005  *
1006  *
1007  * connectby(text relname, text keyid_fld, text parent_keyid_fld
1008  *                        [, text orderby_fld], text start_with, int max_depth
1009  *                        [, text branch_delim])
1010  * connectby('foo', 'keyid', 'parent_keyid', 'pos', 'row2', 0, '~') returns:
1011  *
1012  *              keyid   parent_id       level    branch                         serial
1013  *              ------+-----------+--------+-----------------------
1014  *              row2    NULL              0               row2                            1
1015  *              row5    row2              1               row2~row5                       2
1016  *              row9    row5              2               row2~row5~row9          3
1017  *              row4    row2              1               row2~row4                       4
1018  *              row6    row4              2               row2~row4~row6          5
1019  *              row8    row6              3               row2~row4~row6~row8 6
1020  *
1021  */
1022 PG_FUNCTION_INFO_V1(connectby_text);
1023
1024 #define CONNECTBY_NCOLS                                 4
1025 #define CONNECTBY_NCOLS_NOBRANCH                3
1026
1027 Datum
1028 connectby_text(PG_FUNCTION_ARGS)
1029 {
1030         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
1031         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
1032         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
1033         char       *start_with = GET_STR(PG_GETARG_TEXT_P(3));
1034         int                     max_depth = PG_GETARG_INT32(4);
1035         char       *branch_delim = NULL;
1036         bool            show_branch = false;
1037         bool            show_serial = false;
1038         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1039         TupleDesc       tupdesc;
1040         AttInMetadata *attinmeta;
1041         MemoryContext per_query_ctx;
1042         MemoryContext oldcontext;
1043
1044         /* check to see if caller supports us returning a tuplestore */
1045         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1046                 ereport(ERROR,
1047                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1048                                  errmsg("set-valued function called in context that cannot accept a set")));
1049         if (!(rsinfo->allowedModes & SFRM_Materialize))
1050                 ereport(ERROR,
1051                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1052                                  errmsg("materialize mode required, but it is not " \
1053                                                 "allowed in this context")));
1054
1055         if (fcinfo->nargs == 6)
1056         {
1057                 branch_delim = GET_STR(PG_GETARG_TEXT_P(5));
1058                 show_branch = true;
1059         }
1060         else
1061                 /* default is no show, tilde for the delimiter */
1062                 branch_delim = pstrdup("~");
1063
1064         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1065         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1066
1067         /* get the requested return tuple description */
1068         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1069
1070         /* does it meet our needs */
1071         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1072
1073         /* OK, use it then */
1074         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1075
1076         /* OK, go to work */
1077         rsinfo->returnMode = SFRM_Materialize;
1078         rsinfo->setResult = connectby(relname,
1079                                                                   key_fld,
1080                                                                   parent_key_fld,
1081                                                                   NULL,
1082                                                                   branch_delim,
1083                                                                   start_with,
1084                                                                   max_depth,
1085                                                                   show_branch,
1086                                                                   show_serial,
1087                                                                   per_query_ctx,
1088                                                                   attinmeta);
1089         rsinfo->setDesc = tupdesc;
1090
1091         MemoryContextSwitchTo(oldcontext);
1092
1093         /*
1094          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1095          * tuples are in our tuplestore and passed back through rsinfo->setResult.
1096          * rsinfo->setDesc is set to the tuple description that we actually used
1097          * to build our tuples with, so the caller can verify we did what it was
1098          * expecting.
1099          */
1100         return (Datum) 0;
1101 }
1102
1103 PG_FUNCTION_INFO_V1(connectby_text_serial);
1104 Datum
1105 connectby_text_serial(PG_FUNCTION_ARGS)
1106 {
1107         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
1108         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
1109         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
1110         char       *orderby_fld = GET_STR(PG_GETARG_TEXT_P(3));
1111         char       *start_with = GET_STR(PG_GETARG_TEXT_P(4));
1112         int                     max_depth = PG_GETARG_INT32(5);
1113         char       *branch_delim = NULL;
1114         bool            show_branch = false;
1115         bool            show_serial = true;
1116
1117         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1118         TupleDesc       tupdesc;
1119         AttInMetadata *attinmeta;
1120         MemoryContext per_query_ctx;
1121         MemoryContext oldcontext;
1122
1123         /* check to see if caller supports us returning a tuplestore */
1124         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1125                 ereport(ERROR,
1126                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1127                                  errmsg("set-valued function called in context that cannot accept a set")));
1128         if (!(rsinfo->allowedModes & SFRM_Materialize))
1129                 ereport(ERROR,
1130                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1131                                  errmsg("materialize mode required, but it is not " \
1132                                                 "allowed in this context")));
1133
1134         if (fcinfo->nargs == 7)
1135         {
1136                 branch_delim = GET_STR(PG_GETARG_TEXT_P(6));
1137                 show_branch = true;
1138         }
1139         else
1140                 /* default is no show, tilde for the delimiter */
1141                 branch_delim = pstrdup("~");
1142
1143         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1144         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1145
1146         /* get the requested return tuple description */
1147         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1148
1149         /* does it meet our needs */
1150         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1151
1152         /* OK, use it then */
1153         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1154
1155         /* OK, go to work */
1156         rsinfo->returnMode = SFRM_Materialize;
1157         rsinfo->setResult = connectby(relname,
1158                                                                   key_fld,
1159                                                                   parent_key_fld,
1160                                                                   orderby_fld,
1161                                                                   branch_delim,
1162                                                                   start_with,
1163                                                                   max_depth,
1164                                                                   show_branch,
1165                                                                   show_serial,
1166                                                                   per_query_ctx,
1167                                                                   attinmeta);
1168         rsinfo->setDesc = tupdesc;
1169
1170         MemoryContextSwitchTo(oldcontext);
1171
1172         /*
1173          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1174          * tuples are in our tuplestore and passed back through rsinfo->setResult.
1175          * rsinfo->setDesc is set to the tuple description that we actually used
1176          * to build our tuples with, so the caller can verify we did what it was
1177          * expecting.
1178          */
1179         return (Datum) 0;
1180 }
1181
1182
1183 /*
1184  * connectby - does the real work for connectby_text()
1185  */
1186 static Tuplestorestate *
1187 connectby(char *relname,
1188                   char *key_fld,
1189                   char *parent_key_fld,
1190                   char *orderby_fld,
1191                   char *branch_delim,
1192                   char *start_with,
1193                   int max_depth,
1194                   bool show_branch,
1195                   bool show_serial,
1196                   MemoryContext per_query_ctx,
1197                   AttInMetadata *attinmeta)
1198 {
1199         Tuplestorestate *tupstore = NULL;
1200         int                     ret;
1201         MemoryContext oldcontext;
1202
1203         int                     serial = 1;
1204
1205         /* Connect to SPI manager */
1206         if ((ret = SPI_connect()) < 0)
1207                 /* internal error */
1208                 elog(ERROR, "connectby: SPI_connect returned %d", ret);
1209
1210         /* switch to longer term context to create the tuple store */
1211         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1212
1213         /* initialize our tuplestore */
1214         tupstore = tuplestore_begin_heap(true, false, work_mem);
1215
1216         MemoryContextSwitchTo(oldcontext);
1217
1218         /* now go get the whole tree */
1219         tupstore = build_tuplestore_recursively(key_fld,
1220                                                                                         parent_key_fld,
1221                                                                                         relname,
1222                                                                                         orderby_fld,
1223                                                                                         branch_delim,
1224                                                                                         start_with,
1225                                                                                         start_with, /* current_branch */
1226                                                                                         0,      /* initial level is 0 */
1227                                                                                         &serial,        /* initial serial is 1 */
1228                                                                                         max_depth,
1229                                                                                         show_branch,
1230                                                                                         show_serial,
1231                                                                                         per_query_ctx,
1232                                                                                         attinmeta,
1233                                                                                         tupstore);
1234
1235         SPI_finish();
1236
1237         return tupstore;
1238 }
1239
1240 static Tuplestorestate *
1241 build_tuplestore_recursively(char *key_fld,
1242                                                          char *parent_key_fld,
1243                                                          char *relname,
1244                                                          char *orderby_fld,
1245                                                          char *branch_delim,
1246                                                          char *start_with,
1247                                                          char *branch,
1248                                                          int level,
1249                                                          int *serial,
1250                                                          int max_depth,
1251                                                          bool show_branch,
1252                                                          bool show_serial,
1253                                                          MemoryContext per_query_ctx,
1254                                                          AttInMetadata *attinmeta,
1255                                                          Tuplestorestate *tupstore)
1256 {
1257         TupleDesc       tupdesc = attinmeta->tupdesc;
1258         MemoryContext oldcontext;
1259         int                     ret;
1260         int                     proc;
1261         int                     serial_column;
1262         StringInfoData sql;
1263         char      **values;
1264         char       *current_key;
1265         char       *current_key_parent;
1266         char            current_level[INT32_STRLEN];
1267         char            serial_str[INT32_STRLEN];
1268         char       *current_branch;
1269         HeapTuple       tuple;
1270
1271         if (max_depth > 0 && level > max_depth)
1272                 return tupstore;
1273
1274         initStringInfo(&sql);
1275
1276         /* Build initial sql statement */
1277         if (!show_serial)
1278         {
1279                 appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s",
1280                                                  key_fld,
1281                                                  parent_key_fld,
1282                                                  relname,
1283                                                  parent_key_fld,
1284                                                  quote_literal_cstr(start_with),
1285                                                  key_fld, key_fld, parent_key_fld);
1286                 serial_column = 0;
1287         }
1288         else
1289         {
1290                 appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s ORDER BY %s",
1291                                                  key_fld,
1292                                                  parent_key_fld,
1293                                                  relname,
1294                                                  parent_key_fld,
1295                                                  quote_literal_cstr(start_with),
1296                                                  key_fld, key_fld, parent_key_fld,
1297                                                  orderby_fld);
1298                 serial_column = 1;
1299         }
1300
1301         if (show_branch)
1302                 values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
1303         else
1304                 values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
1305
1306         /* First time through, do a little setup */
1307         if (level == 0)
1308         {
1309                 /* root value is the one we initially start with */
1310                 values[0] = start_with;
1311
1312                 /* root value has no parent */
1313                 values[1] = NULL;
1314
1315                 /* root level is 0 */
1316                 sprintf(current_level, "%d", level);
1317                 values[2] = current_level;
1318
1319                 /* root branch is just starting root value */
1320                 if (show_branch)
1321                         values[3] = start_with;
1322
1323                 /* root starts the serial with 1 */
1324                 if (show_serial)
1325                 {
1326                         sprintf(serial_str, "%d", (*serial)++);
1327                         if (show_branch)
1328                                 values[4] = serial_str;
1329                         else
1330                                 values[3] = serial_str;
1331                 }
1332
1333                 /* construct the tuple */
1334                 tuple = BuildTupleFromCStrings(attinmeta, values);
1335
1336                 /* switch to long lived context while storing the tuple */
1337                 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1338
1339                 /* now store it */
1340                 tuplestore_puttuple(tupstore, tuple);
1341
1342                 /* now reset the context */
1343                 MemoryContextSwitchTo(oldcontext);
1344
1345                 /* increment level */
1346                 level++;
1347         }
1348
1349         /* Retrieve the desired rows */
1350         ret = SPI_execute(sql.data, true, 0);
1351         proc = SPI_processed;
1352
1353         /* Check for qualifying tuples */
1354         if ((ret == SPI_OK_SELECT) && (proc > 0))
1355         {
1356                 HeapTuple       spi_tuple;
1357                 SPITupleTable *tuptable = SPI_tuptable;
1358                 TupleDesc       spi_tupdesc = tuptable->tupdesc;
1359                 int                     i;
1360                 StringInfoData branchstr;
1361                 StringInfoData chk_branchstr;
1362                 StringInfoData chk_current_key;
1363
1364                 /* First time through, do a little more setup */
1365                 if (level == 0)
1366                 {
1367                         /*
1368                          * Check that return tupdesc is compatible with the one we got
1369                          * from the query, but only at level 0 -- no need to check more
1370                          * than once
1371                          */
1372
1373                         if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
1374                                 ereport(ERROR,
1375                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1376                                                  errmsg("invalid return type"),
1377                                                  errdetail("Return and SQL tuple descriptions are " \
1378                                                                    "incompatible.")));
1379                 }
1380
1381                 for (i = 0; i < proc; i++)
1382                 {
1383                         /* start a new branch */
1384                         initStringInfo(&branchstr);
1385
1386                         /* need these to check for recursion */
1387                         initStringInfo(&chk_branchstr);
1388                         initStringInfo(&chk_current_key);
1389
1390                         /* initialize branch for this pass */
1391                         appendStringInfo(&branchstr, "%s", branch);
1392                         appendStringInfo(&chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
1393
1394                         /* get the next sql result tuple */
1395                         spi_tuple = tuptable->vals[i];
1396
1397                         /* get the current key and parent */
1398                         current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
1399                         appendStringInfo(&chk_current_key, "%s%s%s", branch_delim, current_key, branch_delim);
1400                         current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
1401
1402                         /* get the current level */
1403                         sprintf(current_level, "%d", level);
1404
1405                         /* check to see if this key is also an ancestor */
1406                         if (strstr(chk_branchstr.data, chk_current_key.data))
1407                                 elog(ERROR, "infinite recursion detected");
1408
1409                         /* OK, extend the branch */
1410                         appendStringInfo(&branchstr, "%s%s", branch_delim, current_key);
1411                         current_branch = branchstr.data;
1412
1413                         /* build a tuple */
1414                         values[0] = pstrdup(current_key);
1415                         values[1] = current_key_parent;
1416                         values[2] = current_level;
1417                         if (show_branch)
1418                                 values[3] = current_branch;
1419                         if (show_serial)
1420                         {
1421                                 sprintf(serial_str, "%d", (*serial)++);
1422                                 if (show_branch)
1423                                         values[4] = serial_str;
1424                                 else
1425                                         values[3] = serial_str;
1426                         }
1427
1428                         tuple = BuildTupleFromCStrings(attinmeta, values);
1429
1430                         xpfree(current_key);
1431                         xpfree(current_key_parent);
1432
1433                         /* switch to long lived context while storing the tuple */
1434                         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1435
1436                         /* store the tuple for later use */
1437                         tuplestore_puttuple(tupstore, tuple);
1438
1439                         /* now reset the context */
1440                         MemoryContextSwitchTo(oldcontext);
1441
1442                         heap_freetuple(tuple);
1443
1444                         /* recurse using current_key_parent as the new start_with */
1445                         tupstore = build_tuplestore_recursively(key_fld,
1446                                                                                                         parent_key_fld,
1447                                                                                                         relname,
1448                                                                                                         orderby_fld,
1449                                                                                                         branch_delim,
1450                                                                                                         values[0],
1451                                                                                                         current_branch,
1452                                                                                                         level + 1,
1453                                                                                                         serial,
1454                                                                                                         max_depth,
1455                                                                                                         show_branch,
1456                                                                                                         show_serial,
1457                                                                                                         per_query_ctx,
1458                                                                                                         attinmeta,
1459                                                                                                         tupstore);
1460
1461                         /* reset branch for next pass */
1462                         xpfree(branchstr.data);
1463                         xpfree(chk_branchstr.data);
1464                         xpfree(chk_current_key.data);
1465                 }
1466         }
1467
1468         return tupstore;
1469 }
1470
1471 /*
1472  * Check expected (query runtime) tupdesc suitable for Connectby
1473  */
1474 static void
1475 validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial)
1476 {
1477         int                     serial_column = 0;
1478
1479         if (show_serial)
1480                 serial_column = 1;
1481
1482         /* are there the correct number of columns */
1483         if (show_branch)
1484         {
1485                 if (tupdesc->natts != (CONNECTBY_NCOLS + serial_column))
1486                         ereport(ERROR,
1487                                         (errcode(ERRCODE_SYNTAX_ERROR),
1488                                          errmsg("invalid return type"),
1489                                          errdetail("Query-specified return tuple has " \
1490                                                            "wrong number of columns.")));
1491         }
1492         else
1493         {
1494                 if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH + serial_column)
1495                         ereport(ERROR,
1496                                         (errcode(ERRCODE_SYNTAX_ERROR),
1497                                          errmsg("invalid return type"),
1498                                          errdetail("Query-specified return tuple has " \
1499                                                            "wrong number of columns.")));
1500         }
1501
1502         /* check that the types of the first two columns match */
1503         if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
1504                 ereport(ERROR,
1505                                 (errcode(ERRCODE_SYNTAX_ERROR),
1506                                  errmsg("invalid return type"),
1507                                  errdetail("First two columns must be the same type.")));
1508
1509         /* check that the type of the third column is INT4 */
1510         if (tupdesc->attrs[2]->atttypid != INT4OID)
1511                 ereport(ERROR,
1512                                 (errcode(ERRCODE_SYNTAX_ERROR),
1513                                  errmsg("invalid return type"),
1514                                  errdetail("Third column must be type %s.",
1515                                                    format_type_be(INT4OID))));
1516
1517         /* check that the type of the fourth column is TEXT if applicable */
1518         if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
1519                 ereport(ERROR,
1520                                 (errcode(ERRCODE_SYNTAX_ERROR),
1521                                  errmsg("invalid return type"),
1522                                  errdetail("Fourth column must be type %s.",
1523                                                    format_type_be(TEXTOID))));
1524
1525         /* check that the type of the fifth column is INT4 */
1526         if (show_branch && show_serial && tupdesc->attrs[4]->atttypid != INT4OID)
1527                 elog(ERROR, "query-specified return tuple not valid for Connectby: "
1528                          "fifth column must be type %s", format_type_be(INT4OID));
1529
1530         /* check that the type of the fifth column is INT4 */
1531         if (!show_branch && show_serial && tupdesc->attrs[3]->atttypid != INT4OID)
1532                 elog(ERROR, "query-specified return tuple not valid for Connectby: "
1533                          "fourth column must be type %s", format_type_be(INT4OID));
1534
1535         /* OK, the tupdesc is valid for our purposes */
1536 }
1537
1538 /*
1539  * Check if spi sql tupdesc and return tupdesc are compatible
1540  */
1541 static bool
1542 compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1543 {
1544         Oid                     ret_atttypid;
1545         Oid                     sql_atttypid;
1546
1547         /* check the key_fld types match */
1548         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1549         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1550         if (ret_atttypid != sql_atttypid)
1551                 ereport(ERROR,
1552                                 (errcode(ERRCODE_SYNTAX_ERROR),
1553                                  errmsg("invalid return type"),
1554                                  errdetail("SQL key field datatype does " \
1555                                                    "not match return key field datatype.")));
1556
1557         /* check the parent_key_fld types match */
1558         ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
1559         sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
1560         if (ret_atttypid != sql_atttypid)
1561                 ereport(ERROR,
1562                                 (errcode(ERRCODE_SYNTAX_ERROR),
1563                                  errmsg("invalid return type"),
1564                                  errdetail("SQL parent key field datatype does " \
1565                                                    "not match return parent key field datatype.")));
1566
1567         /* OK, the two tupdescs are compatible for our purposes */
1568         return true;
1569 }
1570
1571 /*
1572  * Check if two tupdescs match in type of attributes
1573  */
1574 static bool
1575 compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1576 {
1577         int                     i;
1578         Form_pg_attribute ret_attr;
1579         Oid                     ret_atttypid;
1580         Form_pg_attribute sql_attr;
1581         Oid                     sql_atttypid;
1582
1583         /* check the rowid types match */
1584         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1585         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1586         if (ret_atttypid != sql_atttypid)
1587                 ereport(ERROR,
1588                                 (errcode(ERRCODE_SYNTAX_ERROR),
1589                                  errmsg("invalid return type"),
1590                                  errdetail("SQL rowid datatype does not match " \
1591                                                    "return rowid datatype.")));
1592
1593         /*
1594          * - attribute [1] of the sql tuple is the category; no need to check it -
1595          * attribute [2] of the sql tuple should match attributes [1] to [natts]
1596          * of the return tuple
1597          */
1598         sql_attr = sql_tupdesc->attrs[2];
1599         for (i = 1; i < ret_tupdesc->natts; i++)
1600         {
1601                 ret_attr = ret_tupdesc->attrs[i];
1602
1603                 if (ret_attr->atttypid != sql_attr->atttypid)
1604                         return false;
1605         }
1606
1607         /* OK, the two tupdescs are compatible for our purposes */
1608         return true;
1609 }
1610
1611 /*
1612  * Return a properly quoted literal value.
1613  * Uses quote_literal in quote.c
1614  */
1615 static char *
1616 quote_literal_cstr(char *rawstr)
1617 {
1618         text       *rawstr_text;
1619         text       *result_text;
1620         char       *result;
1621
1622         rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
1623         result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
1624         result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
1625
1626         return result;
1627 }