]> granicus.if.org Git - postgresql/blob - contrib/tablefunc/tablefunc.c
With Joe Conway's concurrence, remove srandom() call from normal_rand().
[postgresql] / contrib / tablefunc / tablefunc.c
1 /*
2  * tablefunc
3  *
4  * Sample to demonstrate C functions which return setof scalar
5  * and setof composite.
6  * Joe Conway <mail@joeconway.com>
7  * And contributors:
8  * Nabil Sayegh <postgresql@e-trolley.de>
9  *
10  * Copyright (c) 2002-2003, PostgreSQL Global Development Group
11  *
12  * Permission to use, copy, modify, and distribute this software and its
13  * documentation for any purpose, without fee, and without a written agreement
14  * is hereby granted, provided that the above copyright notice and this
15  * paragraph and the following two paragraphs appear in all copies.
16  *
17  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
18  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
19  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
20  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
21  * POSSIBILITY OF SUCH DAMAGE.
22  *
23  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
24  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
25  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
26  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
27  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
28  *
29  */
30 #include "postgres.h"
31
32 #include <math.h>
33
34 #include "fmgr.h"
35 #include "funcapi.h"
36 #include "executor/spi.h"
37 #include "lib/stringinfo.h"
38 #include "miscadmin.h"
39 #include "utils/builtins.h"
40 #include "utils/guc.h"
41 #include "utils/lsyscache.h"
42
43 #include "tablefunc.h"
44
45 static int      load_categories_hash(char *cats_sql, MemoryContext per_query_ctx);
46 static Tuplestorestate *get_crosstab_tuplestore(char *sql,
47                                                 int num_categories,
48                                                 TupleDesc tupdesc,
49                                                 MemoryContext per_query_ctx);
50 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial);
51 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
52 static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
53 static void get_normal_pair(float8 *x1, float8 *x2);
54 static TupleDesc make_crosstab_tupledesc(TupleDesc spi_tupdesc,
55                                                 int num_categories);
56 static Tuplestorestate *connectby(char *relname,
57                   char *key_fld,
58                   char *parent_key_fld,
59                   char *orderby_fld,
60                   char *branch_delim,
61                   char *start_with,
62                   int max_depth,
63                   bool show_branch,
64                   bool show_serial,
65                   MemoryContext per_query_ctx,
66                   AttInMetadata *attinmeta);
67 static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
68                                                          char *parent_key_fld,
69                                                          char *relname,
70                                                          char *orderby_fld,
71                                                          char *branch_delim,
72                                                          char *start_with,
73                                                          char *branch,
74                                                          int level,
75                                                          int *serial,
76                                                          int max_depth,
77                                                          bool show_branch,
78                                                          bool show_serial,
79                                                          MemoryContext per_query_ctx,
80                                                          AttInMetadata *attinmeta,
81                                                          Tuplestorestate *tupstore);
82
83 typedef struct
84 {
85         float8          mean;                   /* mean of the distribution */
86         float8          stddev;                 /* stddev of the distribution */
87         float8          carry_val;              /* hold second generated value */
88         bool            use_carry;              /* use second generated value */
89 }       normal_rand_fctx;
90
91 typedef struct
92 {
93         SPITupleTable *spi_tuptable;    /* sql results from user query */
94         char       *lastrowid;          /* rowid of the last tuple sent */
95 }       crosstab_fctx;
96
97 #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
98 #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
99 #define xpfree(var_) \
100         do { \
101                 if (var_ != NULL) \
102                 { \
103                         pfree(var_); \
104                         var_ = NULL; \
105                 } \
106         } while (0)
107
108 /* sign, 10 digits, '\0' */
109 #define INT32_STRLEN    12
110
111 /* hash table support */
112 static HTAB *crosstab_HashTable;
113
114 /* The information we cache about loaded procedures */
115 typedef struct crosstab_cat_desc
116 {
117         char       *catname;
118         int                     attidx;                 /* zero based */
119 }       crosstab_cat_desc;
120
121 #define MAX_CATNAME_LEN                 NAMEDATALEN
122 #define INIT_CATS                               64
123
124 #define crosstab_HashTableLookup(CATNAME, CATDESC) \
125 do { \
126         crosstab_HashEnt *hentry; char key[MAX_CATNAME_LEN]; \
127         \
128         MemSet(key, 0, MAX_CATNAME_LEN); \
129         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATNAME); \
130         hentry = (crosstab_HashEnt*) hash_search(crosstab_HashTable, \
131                                                                                  key, HASH_FIND, NULL); \
132         if (hentry) \
133                 CATDESC = hentry->catdesc; \
134         else \
135                 CATDESC = NULL; \
136 } while(0)
137
138 #define crosstab_HashTableInsert(CATDESC) \
139 do { \
140         crosstab_HashEnt *hentry; bool found; char key[MAX_CATNAME_LEN]; \
141         \
142         MemSet(key, 0, MAX_CATNAME_LEN); \
143         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATDESC->catname); \
144         hentry = (crosstab_HashEnt*) hash_search(crosstab_HashTable, \
145                                                                                  key, HASH_ENTER, &found); \
146         if (hentry == NULL) \
147                 ereport(ERROR, \
148                                 (errcode(ERRCODE_OUT_OF_MEMORY), \
149                                  errmsg("out of memory"))); \
150         if (found) \
151                 ereport(ERROR, \
152                                 (errcode(ERRCODE_DUPLICATE_OBJECT), \
153                                  errmsg("duplicate category name"))); \
154         hentry->catdesc = CATDESC; \
155 } while(0)
156
157 /* hash table */
158 typedef struct crosstab_hashent
159 {
160         char            internal_catname[MAX_CATNAME_LEN];
161         crosstab_cat_desc *catdesc;
162 }       crosstab_HashEnt;
163
164 /*
165  * normal_rand - return requested number of random values
166  * with a Gaussian (Normal) distribution.
167  *
168  * inputs are int numvals, float8 mean, and float8 stddev
169  * returns setof float8
170  */
171 PG_FUNCTION_INFO_V1(normal_rand);
172 Datum
173 normal_rand(PG_FUNCTION_ARGS)
174 {
175         FuncCallContext *funcctx;
176         int                     call_cntr;
177         int                     max_calls;
178         normal_rand_fctx *fctx;
179         float8          mean;
180         float8          stddev;
181         float8          carry_val;
182         bool            use_carry;
183         MemoryContext oldcontext;
184
185         /* stuff done only on the first call of the function */
186         if (SRF_IS_FIRSTCALL())
187         {
188                 /* create a function context for cross-call persistence */
189                 funcctx = SRF_FIRSTCALL_INIT();
190
191                 /*
192                  * switch to memory context appropriate for multiple function
193                  * calls
194                  */
195                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
196
197                 /* total number of tuples to be returned */
198                 funcctx->max_calls = PG_GETARG_UINT32(0);
199
200                 /* allocate memory for user context */
201                 fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
202
203                 /*
204                  * Use fctx to keep track of upper and lower bounds from call to
205                  * call. It will also be used to carry over the spare value we get
206                  * from the Box-Muller algorithm so that we only actually
207                  * calculate a new value every other call.
208                  */
209                 fctx->mean = PG_GETARG_FLOAT8(1);
210                 fctx->stddev = PG_GETARG_FLOAT8(2);
211                 fctx->carry_val = 0;
212                 fctx->use_carry = false;
213
214                 funcctx->user_fctx = fctx;
215
216                 MemoryContextSwitchTo(oldcontext);
217         }
218
219         /* stuff done on every call of the function */
220         funcctx = SRF_PERCALL_SETUP();
221
222         call_cntr = funcctx->call_cntr;
223         max_calls = funcctx->max_calls;
224         fctx = funcctx->user_fctx;
225         mean = fctx->mean;
226         stddev = fctx->stddev;
227         carry_val = fctx->carry_val;
228         use_carry = fctx->use_carry;
229
230         if (call_cntr < max_calls)      /* do when there is more left to send */
231         {
232                 float8          result;
233
234                 if (use_carry)
235                 {
236                         /*
237                          * reset use_carry and use second value obtained on last pass
238                          */
239                         fctx->use_carry = false;
240                         result = carry_val;
241                 }
242                 else
243                 {
244                         float8          normval_1;
245                         float8          normval_2;
246
247                         /* Get the next two normal values */
248                         get_normal_pair(&normval_1, &normval_2);
249
250                         /* use the first */
251                         result = mean + (stddev * normval_1);
252
253                         /* and save the second */
254                         fctx->carry_val = mean + (stddev * normval_2);
255                         fctx->use_carry = true;
256                 }
257
258                 /* send the result */
259                 SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
260         }
261         else
262 /* do when there is no more left */
263                 SRF_RETURN_DONE(funcctx);
264 }
265
266 /*
267  * get_normal_pair()
268  * Assigns normally distributed (Gaussian) values to a pair of provided
269  * parameters, with mean 0, standard deviation 1.
270  *
271  * This routine implements Algorithm P (Polar method for normal deviates)
272  * from Knuth's _The_Art_of_Computer_Programming_, Volume 2, 3rd ed., pages
273  * 122-126. Knuth cites his source as "The polar method", G. E. P. Box, M. E.
274  * Muller, and G. Marsaglia, _Annals_Math,_Stat._ 29 (1958), 610-611.
275  *
276  */
277 static void
278 get_normal_pair(float8 *x1, float8 *x2)
279 {
280         float8          u1,
281                                 u2,
282                                 v1,
283                                 v2,
284                                 s;
285
286         do
287         {
288                 u1 = (float8) random() / (float8) MAX_RANDOM_VALUE;
289                 u2 = (float8) random() / (float8) MAX_RANDOM_VALUE;
290
291                 v1 = (2.0 * u1) - 1.0;
292                 v2 = (2.0 * u2) - 1.0;
293
294                 s = v1 * v1 + v2 * v2;
295         } while (s >= 1.0);
296
297         if (s == 0)
298         {
299                 *x1 = 0;
300                 *x2 = 0;
301         }
302         else
303         {
304                 s = sqrt((-2.0 * log(s)) / s);
305                 *x1 = v1 * s;
306                 *x2 = v2 * s;
307         }
308 }
309
310 /*
311  * crosstab - create a crosstab of rowids and values columns from a
312  * SQL statement returning one rowid column, one category column,
313  * and one value column.
314  *
315  * e.g. given sql which produces:
316  *
317  *                      rowid   cat             value
318  *                      ------+-------+-------
319  *                      row1    cat1    val1
320  *                      row1    cat2    val2
321  *                      row1    cat3    val3
322  *                      row1    cat4    val4
323  *                      row2    cat1    val5
324  *                      row2    cat2    val6
325  *                      row2    cat3    val7
326  *                      row2    cat4    val8
327  *
328  * crosstab returns:
329  *                                      <===== values columns =====>
330  *                      rowid   cat1    cat2    cat3    cat4
331  *                      ------+-------+-------+-------+-------
332  *                      row1    val1    val2    val3    val4
333  *                      row2    val5    val6    val7    val8
334  *
335  * NOTES:
336  * 1. SQL result must be ordered by 1,2.
337  * 2. The number of values columns depends on the tuple description
338  *        of the function's declared return type.
339  * 2. Missing values (i.e. not enough adjacent rows of same rowid to
340  *        fill the number of result values columns) are filled in with nulls.
341  * 3. Extra values (i.e. too many adjacent rows of same rowid to fill
342  *        the number of result values columns) are skipped.
343  * 4. Rows with all nulls in the values columns are skipped.
344  */
345 PG_FUNCTION_INFO_V1(crosstab);
346 Datum
347 crosstab(PG_FUNCTION_ARGS)
348 {
349         FuncCallContext *funcctx;
350         TupleDesc       ret_tupdesc;
351         int                     call_cntr;
352         int                     max_calls;
353         TupleTableSlot *slot;
354         AttInMetadata *attinmeta;
355         SPITupleTable *spi_tuptable = NULL;
356         TupleDesc       spi_tupdesc;
357         char       *lastrowid = NULL;
358         crosstab_fctx *fctx;
359         int                     i;
360         int                     num_categories;
361         MemoryContext oldcontext;
362
363         /* stuff done only on the first call of the function */
364         if (SRF_IS_FIRSTCALL())
365         {
366                 char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
367                 Oid                     funcid = fcinfo->flinfo->fn_oid;
368                 Oid                     functypeid;
369                 char            functyptype;
370                 TupleDesc       tupdesc = NULL;
371                 int                     ret;
372                 int                     proc;
373
374                 /* create a function context for cross-call persistence */
375                 funcctx = SRF_FIRSTCALL_INIT();
376
377                 /*
378                  * switch to memory context appropriate for multiple function
379                  * calls
380                  */
381                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
382
383                 /* Connect to SPI manager */
384                 if ((ret = SPI_connect()) < 0)
385                         /* internal error */
386                         elog(ERROR, "crosstab: SPI_connect returned %d", ret);
387
388                 /* Retrieve the desired rows */
389                 ret = SPI_exec(sql, 0);
390                 proc = SPI_processed;
391
392                 /* Check for qualifying tuples */
393                 if ((ret == SPI_OK_SELECT) && (proc > 0))
394                 {
395                         spi_tuptable = SPI_tuptable;
396                         spi_tupdesc = spi_tuptable->tupdesc;
397
398                         /*
399                          * The provided SQL query must always return three columns.
400                          *
401                          * 1. rowname   the label or identifier for each row in the final
402                          * result 2. category  the label or identifier for each column
403                          * in the final result 3. values        the value for each column
404                          * in the final result
405                          */
406                         if (spi_tupdesc->natts != 3)
407                                 ereport(ERROR,
408                                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
409                                                  errmsg("invalid source data SQL statement"),
410                                                  errdetail("The provided SQL must return 3 " \
411                                                          " columns; rowid, category, and values.")));
412                 }
413                 else
414                 {
415                         /* no qualifying tuples */
416                         SPI_finish();
417                         SRF_RETURN_DONE(funcctx);
418                 }
419
420                 /* SPI switches context on us, so reset it */
421                 MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
422
423                 /* get the typeid that represents our return type */
424                 functypeid = get_func_rettype(funcid);
425
426                 /* check typtype to see if we have a predetermined return type */
427                 functyptype = get_typtype(functypeid);
428
429                 if (functyptype == 'c')
430                 {
431                         /* Build a tuple description for a functypeid tuple */
432                         tupdesc = TypeGetTupleDesc(functypeid, NIL);
433                 }
434                 else if (functyptype == 'p' && functypeid == RECORDOID)
435                 {
436                         if (fcinfo->nargs != 2)
437                                 ereport(ERROR,
438                                                 (errcode(ERRCODE_SYNTAX_ERROR),
439                                                  errmsg("wrong number of arguments")));
440                         else
441                         {
442                                 int                     num_categories = PG_GETARG_INT32(1);
443
444                                 tupdesc = make_crosstab_tupledesc(spi_tupdesc, num_categories);
445                         }
446                 }
447                 else
448                         ereport(ERROR,
449                                         (errcode(ERRCODE_SYNTAX_ERROR),
450                                          errmsg("return type must be a row type")));
451
452                 /*
453                  * Check that return tupdesc is compatible with the one we got
454                  * from ret_relname, at least based on number and type of
455                  * attributes
456                  */
457                 if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
458                         ereport(ERROR,
459                                         (errcode(ERRCODE_SYNTAX_ERROR),
460                                          errmsg("return and sql tuple descriptions are " \
461                                                         "incompatible")));
462
463                 /* allocate a slot for a tuple with this tupdesc */
464                 slot = TupleDescGetSlot(tupdesc);
465
466                 /* assign slot to function context */
467                 funcctx->slot = slot;
468
469                 /*
470                  * Generate attribute metadata needed later to produce tuples from
471                  * raw C strings
472                  */
473                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
474                 funcctx->attinmeta = attinmeta;
475
476                 /* allocate memory for user context */
477                 fctx = (crosstab_fctx *) palloc(sizeof(crosstab_fctx));
478
479                 /*
480                  * Save spi data for use across calls
481                  */
482                 fctx->spi_tuptable = spi_tuptable;
483                 fctx->lastrowid = NULL;
484                 funcctx->user_fctx = fctx;
485
486                 /* total number of tuples to be returned */
487                 funcctx->max_calls = proc;
488
489                 MemoryContextSwitchTo(oldcontext);
490         }
491
492         /* stuff done on every call of the function */
493         funcctx = SRF_PERCALL_SETUP();
494
495         /*
496          * initialize per-call variables
497          */
498         call_cntr = funcctx->call_cntr;
499         max_calls = funcctx->max_calls;
500
501         /* return slot for our tuple */
502         slot = funcctx->slot;
503
504         /* user context info */
505         fctx = (crosstab_fctx *) funcctx->user_fctx;
506         lastrowid = fctx->lastrowid;
507         spi_tuptable = fctx->spi_tuptable;
508
509         /* the sql tuple */
510         spi_tupdesc = spi_tuptable->tupdesc;
511
512         /* attribute return type and return tuple description */
513         attinmeta = funcctx->attinmeta;
514         ret_tupdesc = attinmeta->tupdesc;
515
516         /* the return tuple always must have 1 rowid + num_categories columns */
517         num_categories = ret_tupdesc->natts - 1;
518
519         if (call_cntr < max_calls)      /* do when there is more left to send */
520         {
521                 HeapTuple       tuple;
522                 Datum           result;
523                 char      **values;
524                 bool            allnulls = true;
525
526                 while (true)
527                 {
528                         /* allocate space */
529                         values = (char **) palloc((1 + num_categories) * sizeof(char *));
530
531                         /* and make sure it's clear */
532                         memset(values, '\0', (1 + num_categories) * sizeof(char *));
533
534                         /*
535                          * now loop through the sql results and assign each value in
536                          * sequence to the next category
537                          */
538                         for (i = 0; i < num_categories; i++)
539                         {
540                                 HeapTuple       spi_tuple;
541                                 char       *rowid = NULL;
542
543                                 /* see if we've gone too far already */
544                                 if (call_cntr >= max_calls)
545                                         break;
546
547                                 /* get the next sql result tuple */
548                                 spi_tuple = spi_tuptable->vals[call_cntr];
549
550                                 /* get the rowid from the current sql result tuple */
551                                 rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
552
553                                 /*
554                                  * If this is the first pass through the values for this
555                                  * rowid set it, otherwise make sure it hasn't changed on
556                                  * us. Also check to see if the rowid is the same as that
557                                  * of the last tuple sent -- if so, skip this tuple
558                                  * entirely
559                                  */
560                                 if (i == 0)
561                                         values[0] = pstrdup(rowid);
562
563                                 if ((rowid != NULL) && (strcmp(rowid, values[0]) == 0))
564                                 {
565                                         if ((lastrowid != NULL) && (strcmp(rowid, lastrowid) == 0))
566                                                 break;
567                                         else if (allnulls == true)
568                                                 allnulls = false;
569
570                                         /*
571                                          * Get the next category item value, which is alway
572                                          * attribute number three.
573                                          *
574                                          * Be careful to sssign the value to the array index
575                                          * based on which category we are presently
576                                          * processing.
577                                          */
578                                         values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
579
580                                         /*
581                                          * increment the counter since we consume a row for
582                                          * each category, but not for last pass because the
583                                          * API will do that for us
584                                          */
585                                         if (i < (num_categories - 1))
586                                                 call_cntr = ++funcctx->call_cntr;
587                                 }
588                                 else
589                                 {
590                                         /*
591                                          * We'll fill in NULLs for the missing values, but we
592                                          * need to decrement the counter since this sql result
593                                          * row doesn't belong to the current output tuple.
594                                          */
595                                         call_cntr = --funcctx->call_cntr;
596                                         break;
597                                 }
598
599                                 if (rowid != NULL)
600                                         xpfree(rowid);
601                         }
602
603                         xpfree(fctx->lastrowid);
604
605                         if (values[0] != NULL)
606                         {
607                                 /*
608                                  * switch to memory context appropriate for multiple
609                                  * function calls
610                                  */
611                                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
612
613                                 lastrowid = fctx->lastrowid = pstrdup(values[0]);
614                                 MemoryContextSwitchTo(oldcontext);
615                         }
616
617                         if (!allnulls)
618                         {
619                                 /* build the tuple */
620                                 tuple = BuildTupleFromCStrings(attinmeta, values);
621
622                                 /* make the tuple into a datum */
623                                 result = TupleGetDatum(slot, tuple);
624
625                                 /* Clean up */
626                                 for (i = 0; i < num_categories + 1; i++)
627                                         if (values[i] != NULL)
628                                                 xpfree(values[i]);
629                                 xpfree(values);
630
631                                 SRF_RETURN_NEXT(funcctx, result);
632                         }
633                         else
634                         {
635                                 /*
636                                  * Skipping this tuple entirely, but we need to advance
637                                  * the counter like the API would if we had returned one.
638                                  */
639                                 call_cntr = ++funcctx->call_cntr;
640
641                                 /* we'll start over at the top */
642                                 xpfree(values);
643
644                                 /* see if we've gone too far already */
645                                 if (call_cntr >= max_calls)
646                                 {
647                                         /* release SPI related resources */
648                                         SPI_finish();
649                                         SRF_RETURN_DONE(funcctx);
650                                 }
651                         }
652                 }
653         }
654         else
655 /* do when there is no more left */
656         {
657                 /* release SPI related resources */
658                 SPI_finish();
659                 SRF_RETURN_DONE(funcctx);
660         }
661 }
662
663 /*
664  * crosstab_hash - reimplement crosstab as materialized function and
665  * properly deal with missing values (i.e. don't pack remaining
666  * values to the left)
667  *
668  * crosstab - create a crosstab of rowids and values columns from a
669  * SQL statement returning one rowid column, one category column,
670  * and one value column.
671  *
672  * e.g. given sql which produces:
673  *
674  *                      rowid   cat             value
675  *                      ------+-------+-------
676  *                      row1    cat1    val1
677  *                      row1    cat2    val2
678  *                      row1    cat4    val4
679  *                      row2    cat1    val5
680  *                      row2    cat2    val6
681  *                      row2    cat3    val7
682  *                      row2    cat4    val8
683  *
684  * crosstab returns:
685  *                                      <===== values columns =====>
686  *                      rowid   cat1    cat2    cat3    cat4
687  *                      ------+-------+-------+-------+-------
688  *                      row1    val1    val2    null    val4
689  *                      row2    val5    val6    val7    val8
690  *
691  * NOTES:
692  * 1. SQL result must be ordered by 1.
693  * 2. The number of values columns depends on the tuple description
694  *        of the function's declared return type.
695  * 2. Missing values (i.e. missing category) are filled in with nulls.
696  * 3. Extra values (i.e. not in category results) are skipped.
697  */
698 PG_FUNCTION_INFO_V1(crosstab_hash);
699 Datum
700 crosstab_hash(PG_FUNCTION_ARGS)
701 {
702         char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
703         char       *cats_sql = GET_STR(PG_GETARG_TEXT_P(1));
704         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
705         TupleDesc       tupdesc;
706         MemoryContext per_query_ctx;
707         MemoryContext oldcontext;
708         int                     num_categories;
709
710         /* check to see if caller supports us returning a tuplestore */
711         if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
712                 ereport(ERROR,
713                                 (errcode(ERRCODE_SYNTAX_ERROR),
714                                  errmsg("materialize mode required, but it is not " \
715                                                 "allowed in this context")));
716
717         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
718         oldcontext = MemoryContextSwitchTo(per_query_ctx);
719
720         /* get the requested return tuple description */
721         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
722
723         /*
724          * Check to make sure we have a reasonable tuple descriptor
725          *
726          * Note we will attempt to coerce the values into whatever the return
727          * attribute type is and depend on the "in" function to complain if
728          * needed.
729          */
730         if (tupdesc->natts < 2)
731                 ereport(ERROR,
732                                 (errcode(ERRCODE_SYNTAX_ERROR),
733                                  errmsg("query-specified return tuple and " \
734                                                 "crosstab function are not compatible")));
735
736         /* load up the categories hash table */
737         num_categories = load_categories_hash(cats_sql, per_query_ctx);
738
739         /* let the caller know we're sending back a tuplestore */
740         rsinfo->returnMode = SFRM_Materialize;
741
742         /* now go build it */
743         rsinfo->setResult = get_crosstab_tuplestore(sql,
744                                                                                                 num_categories,
745                                                                                                 tupdesc,
746                                                                                                 per_query_ctx);
747
748         /*
749          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
750          * tuples are in our tuplestore and passed back through
751          * rsinfo->setResult. rsinfo->setDesc is set to the tuple description
752          * that we actually used to build our tuples with, so the caller can
753          * verify we did what it was expecting.
754          */
755         rsinfo->setDesc = tupdesc;
756         MemoryContextSwitchTo(oldcontext);
757
758         return (Datum) 0;
759 }
760
761 /*
762  * load up the categories hash table
763  */
764 static int
765 load_categories_hash(char *cats_sql, MemoryContext per_query_ctx)
766 {
767         HASHCTL         ctl;
768         int                     ret;
769         int                     proc;
770         MemoryContext SPIcontext;
771         int                     num_categories = 0;
772
773         /* initialize the category hash table */
774         ctl.keysize = MAX_CATNAME_LEN;
775         ctl.entrysize = sizeof(crosstab_HashEnt);
776
777         /*
778          * use INIT_CATS, defined above as a guess of how many hash table
779          * entries to create, initially
780          */
781         crosstab_HashTable = hash_create("crosstab hash", INIT_CATS, &ctl, HASH_ELEM);
782
783         /* Connect to SPI manager */
784         if ((ret = SPI_connect()) < 0)
785                 /* internal error */
786                 elog(ERROR, "load_categories_hash: SPI_connect returned %d", ret);
787
788         /* Retrieve the category name rows */
789         ret = SPI_exec(cats_sql, 0);
790         num_categories = proc = SPI_processed;
791
792         /* Check for qualifying tuples */
793         if ((ret == SPI_OK_SELECT) && (proc > 0))
794         {
795                 SPITupleTable *spi_tuptable = SPI_tuptable;
796                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
797                 int                     i;
798
799                 /*
800                  * The provided categories SQL query must always return one
801                  * column: category - the label or identifier for each column
802                  */
803                 if (spi_tupdesc->natts != 1)
804                         ereport(ERROR,
805                                         (errcode(ERRCODE_SYNTAX_ERROR),
806                                          errmsg("provided \"categories\" SQL must " \
807                                                         "return 1 column of at least one row")));
808
809                 for (i = 0; i < proc; i++)
810                 {
811                         crosstab_cat_desc *catdesc;
812                         char       *catname;
813                         HeapTuple       spi_tuple;
814
815                         /* get the next sql result tuple */
816                         spi_tuple = spi_tuptable->vals[i];
817
818                         /* get the category from the current sql result tuple */
819                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
820
821                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
822
823                         catdesc = (crosstab_cat_desc *) palloc(sizeof(crosstab_cat_desc));
824                         catdesc->catname = catname;
825                         catdesc->attidx = i;
826
827                         /* Add the proc description block to the hashtable */
828                         crosstab_HashTableInsert(catdesc);
829
830                         MemoryContextSwitchTo(SPIcontext);
831                 }
832         }
833         else
834         {
835                 /* no qualifying tuples */
836                 SPI_finish();
837                 ereport(ERROR,
838                                 (errcode(ERRCODE_SYNTAX_ERROR),
839                                  errmsg("provided \"categories\" SQL must " \
840                                                 "return 1 column of at least one row")));
841         }
842
843         if (SPI_finish() != SPI_OK_FINISH)
844                 /* internal error */
845                 elog(ERROR, "load_categories_hash: SPI_finish() failed");
846
847         return num_categories;
848 }
849
850 /*
851  * create and populate the crosstab tuplestore using the provided source query
852  */
853 static Tuplestorestate *
854 get_crosstab_tuplestore(char *sql,
855                                                 int num_categories,
856                                                 TupleDesc tupdesc,
857                                                 MemoryContext per_query_ctx)
858 {
859         Tuplestorestate *tupstore;
860         AttInMetadata *attinmeta = TupleDescGetAttInMetadata(tupdesc);
861         char      **values;
862         HeapTuple       tuple;
863         int                     ret;
864         int                     proc;
865         MemoryContext SPIcontext;
866
867         /* initialize our tuplestore */
868         tupstore = tuplestore_begin_heap(true, false, SortMem);
869
870         /* Connect to SPI manager */
871         if ((ret = SPI_connect()) < 0)
872                 /* internal error */
873                 elog(ERROR, "get_crosstab_tuplestore: SPI_connect returned %d", ret);
874
875         /* Now retrieve the crosstab source rows */
876         ret = SPI_exec(sql, 0);
877         proc = SPI_processed;
878
879         /* Check for qualifying tuples */
880         if ((ret == SPI_OK_SELECT) && (proc > 0))
881         {
882                 SPITupleTable *spi_tuptable = SPI_tuptable;
883                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
884                 int                     ncols = spi_tupdesc->natts;
885                 char       *rowid;
886                 char       *lastrowid = NULL;
887                 int                     i,
888                                         j;
889                 int                     result_ncols;
890
891                 /*
892                  * The provided SQL query must always return at least three
893                  * columns:
894                  *
895                  * 1. rowname   the label for each row - column 1 in the final result
896                  * 2. category  the label for each value-column in the final
897                  * result 3. value         the values used to populate the
898                  * value-columns
899                  *
900                  * If there are more than three columns, the last two are taken as
901                  * "category" and "values". The first column is taken as
902                  * "rowname". Additional columns (2 thru N-2) are assumed the same
903                  * for the same "rowname", and are copied into the result tuple
904                  * from the first time we encounter a particular rowname.
905                  */
906                 if (ncols < 3)
907                         ereport(ERROR,
908                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
909                                          errmsg("invalid source data SQL statement"),
910                                          errdetail("The provided SQL must return 3 " \
911                                                          " columns; rowid, category, and values.")));
912
913                 result_ncols = (ncols - 2) + num_categories;
914
915                 /* Recheck to make sure we tuple descriptor still looks reasonable */
916                 if (tupdesc->natts != result_ncols)
917                         ereport(ERROR,
918                                         (errcode(ERRCODE_SYNTAX_ERROR),
919                                          errmsg("invalid return type"),
920                                          errdetail("query-specified return " \
921                                                            "tuple has %d columns but crosstab " \
922                                                    "returns %d", tupdesc->natts, result_ncols)));
923
924                 /* allocate space */
925                 values = (char **) palloc(result_ncols * sizeof(char *));
926
927                 /* and make sure it's clear */
928                 memset(values, '\0', result_ncols * sizeof(char *));
929
930                 for (i = 0; i < proc; i++)
931                 {
932                         HeapTuple       spi_tuple;
933                         crosstab_cat_desc *catdesc;
934                         char       *catname;
935
936                         /* get the next sql result tuple */
937                         spi_tuple = spi_tuptable->vals[i];
938
939                         /* get the rowid from the current sql result tuple */
940                         rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
941
942                         /* if rowid is null, skip this tuple entirely */
943                         if (rowid == NULL)
944                                 continue;
945
946                         /*
947                          * if we're on a new output row, grab the column values up to
948                          * column N-2 now
949                          */
950                         if ((lastrowid == NULL) || (strcmp(rowid, lastrowid) != 0))
951                         {
952                                 /*
953                                  * a new row means we need to flush the old one first,
954                                  * unless we're on the very first row
955                                  */
956                                 if (lastrowid != NULL)
957                                 {
958                                         /*
959                                          * switch to appropriate context while storing the
960                                          * tuple
961                                          */
962                                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
963
964                                         /* rowid changed, flush the previous output row */
965                                         tuple = BuildTupleFromCStrings(attinmeta, values);
966                                         tuplestore_puttuple(tupstore, tuple);
967                                         for (j = 0; j < result_ncols; j++)
968                                                 xpfree(values[j]);
969
970                                         /* now reset the context */
971                                         MemoryContextSwitchTo(SPIcontext);
972                                 }
973
974                                 values[0] = rowid;
975                                 for (j = 1; j < ncols - 2; j++)
976                                         values[j] = SPI_getvalue(spi_tuple, spi_tupdesc, j + 1);
977                         }
978
979                         /* look up the category and fill in the appropriate column */
980                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, ncols - 1);
981
982                         if (catname != NULL)
983                         {
984                                 crosstab_HashTableLookup(catname, catdesc);
985
986                                 if (catdesc)
987                                         values[catdesc->attidx + ncols - 2] =
988                                                 SPI_getvalue(spi_tuple, spi_tupdesc, ncols);
989                         }
990
991                         xpfree(lastrowid);
992                         lastrowid = pstrdup(rowid);
993                 }
994
995                 /* switch to appropriate context while storing the tuple */
996                 SPIcontext = MemoryContextSwitchTo(per_query_ctx);
997
998                 /* flush the last output row */
999                 tuple = BuildTupleFromCStrings(attinmeta, values);
1000                 tuplestore_puttuple(tupstore, tuple);
1001
1002                 /* now reset the context */
1003                 MemoryContextSwitchTo(SPIcontext);
1004
1005         }
1006         else
1007         {
1008                 /* no qualifying tuples */
1009                 SPI_finish();
1010         }
1011
1012         if (SPI_finish() != SPI_OK_FINISH)
1013                 /* internal error */
1014                 elog(ERROR, "get_crosstab_tuplestore: SPI_finish() failed");
1015
1016         tuplestore_donestoring(tupstore);
1017
1018         return tupstore;
1019 }
1020
1021 /*
1022  * connectby_text - produce a result set from a hierarchical (parent/child)
1023  * table.
1024  *
1025  * e.g. given table foo:
1026  *
1027  *                      keyid   parent_keyid pos
1028  *                      ------+------------+--
1029  *                      row1    NULL             0
1030  *                      row2    row1             0
1031  *                      row3    row1             0
1032  *                      row4    row2             1
1033  *                      row5    row2             0
1034  *                      row6    row4             0
1035  *                      row7    row3             0
1036  *                      row8    row6             0
1037  *                      row9    row5             0
1038  *
1039  *
1040  * connectby(text relname, text keyid_fld, text parent_keyid_fld
1041  *                        [, text orderby_fld], text start_with, int max_depth
1042  *                        [, text branch_delim])
1043  * connectby('foo', 'keyid', 'parent_keyid', 'pos', 'row2', 0, '~') returns:
1044  *
1045  *              keyid   parent_id       level    branch                         serial
1046  *              ------+-----------+--------+-----------------------
1047  *              row2    NULL              0               row2                            1
1048  *              row5    row2              1               row2~row5                       2
1049  *              row9    row5              2               row2~row5~row9          3
1050  *              row4    row2              1               row2~row4                       4
1051  *              row6    row4              2               row2~row4~row6          5
1052  *              row8    row6              3               row2~row4~row6~row8 6
1053  *
1054  */
1055 PG_FUNCTION_INFO_V1(connectby_text);
1056
1057 #define CONNECTBY_NCOLS                                 4
1058 #define CONNECTBY_NCOLS_NOBRANCH                3
1059
1060 Datum
1061 connectby_text(PG_FUNCTION_ARGS)
1062 {
1063         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
1064         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
1065         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
1066         char       *start_with = GET_STR(PG_GETARG_TEXT_P(3));
1067         int                     max_depth = PG_GETARG_INT32(4);
1068         char       *branch_delim = NULL;
1069         bool            show_branch = false;
1070         bool            show_serial = false;
1071         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1072         TupleDesc       tupdesc;
1073         AttInMetadata *attinmeta;
1074         MemoryContext per_query_ctx;
1075         MemoryContext oldcontext;
1076
1077         /* check to see if caller supports us returning a tuplestore */
1078         if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
1079                 ereport(ERROR,
1080                                 (errcode(ERRCODE_SYNTAX_ERROR),
1081                                  errmsg("materialize mode required, but it is not " \
1082                                                 "allowed in this context")));
1083
1084         if (fcinfo->nargs == 6)
1085         {
1086                 branch_delim = GET_STR(PG_GETARG_TEXT_P(5));
1087                 show_branch = true;
1088         }
1089         else
1090                 /* default is no show, tilde for the delimiter */
1091                 branch_delim = pstrdup("~");
1092
1093         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1094         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1095
1096         /* get the requested return tuple description */
1097         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1098
1099         /* does it meet our needs */
1100         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1101
1102         /* OK, use it then */
1103         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1104
1105         /* check to see if caller supports us returning a tuplestore */
1106         if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
1107                 ereport(ERROR,
1108                                 (errcode(ERRCODE_SYNTAX_ERROR),
1109                                  errmsg("materialize mode required, but it is not " \
1110                                                 "allowed in this context")));
1111
1112         /* OK, go to work */
1113         rsinfo->returnMode = SFRM_Materialize;
1114         rsinfo->setResult = connectby(relname,
1115                                                                   key_fld,
1116                                                                   parent_key_fld,
1117                                                                   NULL,
1118                                                                   branch_delim,
1119                                                                   start_with,
1120                                                                   max_depth,
1121                                                                   show_branch,
1122                                                                   show_serial,
1123                                                                   per_query_ctx,
1124                                                                   attinmeta);
1125         rsinfo->setDesc = tupdesc;
1126
1127         MemoryContextSwitchTo(oldcontext);
1128
1129         /*
1130          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1131          * tuples are in our tuplestore and passed back through
1132          * rsinfo->setResult. rsinfo->setDesc is set to the tuple description
1133          * that we actually used to build our tuples with, so the caller can
1134          * verify we did what it was expecting.
1135          */
1136         return (Datum) 0;
1137 }
1138
1139 PG_FUNCTION_INFO_V1(connectby_text_serial);
1140 Datum
1141 connectby_text_serial(PG_FUNCTION_ARGS)
1142 {
1143         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
1144         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
1145         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
1146         char       *orderby_fld = GET_STR(PG_GETARG_TEXT_P(3));
1147         char       *start_with = GET_STR(PG_GETARG_TEXT_P(4));
1148         int                     max_depth = PG_GETARG_INT32(5);
1149         char       *branch_delim = NULL;
1150         bool            show_branch = false;
1151         bool            show_serial = true;
1152
1153         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1154         TupleDesc       tupdesc;
1155         AttInMetadata *attinmeta;
1156         MemoryContext per_query_ctx;
1157         MemoryContext oldcontext;
1158
1159         /* check to see if caller supports us returning a tuplestore */
1160         if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
1161                 elog(ERROR, "connectby: materialize mode required, but it is not "
1162                          "allowed in this context");
1163
1164         if (fcinfo->nargs == 7)
1165         {
1166                 branch_delim = GET_STR(PG_GETARG_TEXT_P(6));
1167                 show_branch = true;
1168         }
1169         else
1170                 /* default is no show, tilde for the delimiter */
1171                 branch_delim = pstrdup("~");
1172
1173         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1174         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1175
1176         /* get the requested return tuple description */
1177         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1178
1179         /* does it meet our needs */
1180         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1181
1182         /* OK, use it then */
1183         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1184
1185         /* check to see if caller supports us returning a tuplestore */
1186         if (!rsinfo->allowedModes & SFRM_Materialize)
1187                 elog(ERROR, "connectby requires Materialize mode, but it is not "
1188                          "allowed in this context");
1189
1190         /* OK, go to work */
1191         rsinfo->returnMode = SFRM_Materialize;
1192         rsinfo->setResult = connectby(relname,
1193                                                                   key_fld,
1194                                                                   parent_key_fld,
1195                                                                   orderby_fld,
1196                                                                   branch_delim,
1197                                                                   start_with,
1198                                                                   max_depth,
1199                                                                   show_branch,
1200                                                                   show_serial,
1201                                                                   per_query_ctx,
1202                                                                   attinmeta);
1203         rsinfo->setDesc = tupdesc;
1204
1205         MemoryContextSwitchTo(oldcontext);
1206
1207         /*
1208          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1209          * tuples are in our tuplestore and passed back through
1210          * rsinfo->setResult. rsinfo->setDesc is set to the tuple description
1211          * that we actually used to build our tuples with, so the caller can
1212          * verify we did what it was expecting.
1213          */
1214         return (Datum) 0;
1215 }
1216
1217
1218 /*
1219  * connectby - does the real work for connectby_text()
1220  */
1221 static Tuplestorestate *
1222 connectby(char *relname,
1223                   char *key_fld,
1224                   char *parent_key_fld,
1225                   char *orderby_fld,
1226                   char *branch_delim,
1227                   char *start_with,
1228                   int max_depth,
1229                   bool show_branch,
1230                   bool show_serial,
1231                   MemoryContext per_query_ctx,
1232                   AttInMetadata *attinmeta)
1233 {
1234         Tuplestorestate *tupstore = NULL;
1235         int                     ret;
1236         MemoryContext oldcontext;
1237
1238         int                     serial = 1;
1239
1240         /* Connect to SPI manager */
1241         if ((ret = SPI_connect()) < 0)
1242                 /* internal error */
1243                 elog(ERROR, "connectby: SPI_connect returned %d", ret);
1244
1245         /* switch to longer term context to create the tuple store */
1246         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1247
1248         /* initialize our tuplestore */
1249         tupstore = tuplestore_begin_heap(true, false, SortMem);
1250
1251         MemoryContextSwitchTo(oldcontext);
1252
1253         /* now go get the whole tree */
1254         tupstore = build_tuplestore_recursively(key_fld,
1255                                                                                         parent_key_fld,
1256                                                                                         relname,
1257                                                                                         orderby_fld,
1258                                                                                         branch_delim,
1259                                                                                         start_with,
1260                                                                                         start_with, /* current_branch */
1261                                                                                         0,      /* initial level is 0 */
1262                                                                                         &serial,        /* initial serial is 1 */
1263                                                                                         max_depth,
1264                                                                                         show_branch,
1265                                                                                         show_serial,
1266                                                                                         per_query_ctx,
1267                                                                                         attinmeta,
1268                                                                                         tupstore);
1269
1270         SPI_finish();
1271
1272         return tupstore;
1273 }
1274
1275 static Tuplestorestate *
1276 build_tuplestore_recursively(char *key_fld,
1277                                                          char *parent_key_fld,
1278                                                          char *relname,
1279                                                          char *orderby_fld,
1280                                                          char *branch_delim,
1281                                                          char *start_with,
1282                                                          char *branch,
1283                                                          int level,
1284                                                          int *serial,
1285                                                          int max_depth,
1286                                                          bool show_branch,
1287                                                          bool show_serial,
1288                                                          MemoryContext per_query_ctx,
1289                                                          AttInMetadata *attinmeta,
1290                                                          Tuplestorestate *tupstore)
1291 {
1292         TupleDesc       tupdesc = attinmeta->tupdesc;
1293         MemoryContext oldcontext;
1294         StringInfo      sql = makeStringInfo();
1295         int                     ret;
1296         int                     proc;
1297         int                     serial_column;
1298
1299         if (max_depth > 0 && level > max_depth)
1300                 return tupstore;
1301
1302         /* Build initial sql statement */
1303         if (!show_serial)
1304         {
1305                 appendStringInfo(sql, "SELECT %s, %s FROM %s WHERE %s = '%s' AND %s IS NOT NULL",
1306                                                  key_fld,
1307                                                  parent_key_fld,
1308                                                  relname,
1309                                                  parent_key_fld,
1310                                                  start_with,
1311                                                  key_fld);
1312                 serial_column = 0;
1313         }
1314         else
1315         {
1316                 appendStringInfo(sql, "SELECT %s, %s FROM %s WHERE %s = '%s' AND %s IS NOT NULL ORDER BY %s",
1317                                                  key_fld,
1318                                                  parent_key_fld,
1319                                                  relname,
1320                                                  parent_key_fld,
1321                                                  start_with,
1322                                                  key_fld,
1323                                                  orderby_fld);
1324                 serial_column = 1;
1325         }
1326
1327         /* Retrieve the desired rows */
1328         ret = SPI_exec(sql->data, 0);
1329         proc = SPI_processed;
1330
1331         /* Check for qualifying tuples */
1332         if ((ret == SPI_OK_SELECT) && (proc > 0))
1333         {
1334                 HeapTuple       tuple;
1335                 HeapTuple       spi_tuple;
1336                 SPITupleTable *tuptable = SPI_tuptable;
1337                 TupleDesc       spi_tupdesc = tuptable->tupdesc;
1338                 int                     i;
1339                 char       *current_key;
1340                 char       *current_key_parent;
1341                 char            current_level[INT32_STRLEN];
1342                 char            serial_str[INT32_STRLEN];
1343                 char       *current_branch;
1344                 char      **values;
1345                 StringInfo      branchstr = NULL;
1346                 StringInfo      chk_branchstr = NULL;
1347                 StringInfo      chk_current_key = NULL;
1348
1349                 /* start a new branch */
1350                 branchstr = makeStringInfo();
1351
1352                 /* need these to check for recursion */
1353                 chk_branchstr = makeStringInfo();
1354                 chk_current_key = makeStringInfo();
1355
1356                 if (show_branch)
1357                         values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
1358                 else
1359                         values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
1360
1361                 /* First time through, do a little setup */
1362                 if (level == 0)
1363                 {
1364                         /*
1365                          * Check that return tupdesc is compatible with the one we got
1366                          * from the query, but only at level 0 -- no need to check
1367                          * more than once
1368                          */
1369
1370                         if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
1371                                 ereport(ERROR,
1372                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1373                                                  errmsg("invalid return type"),
1374                                          errdetail("Return and SQL tuple descriptions are " \
1375                                                            "incompatible.")));
1376
1377                         /* root value is the one we initially start with */
1378                         values[0] = start_with;
1379
1380                         /* root value has no parent */
1381                         values[1] = NULL;
1382
1383                         /* root level is 0 */
1384                         sprintf(current_level, "%d", level);
1385                         values[2] = current_level;
1386
1387                         /* root branch is just starting root value */
1388                         if (show_branch)
1389                                 values[3] = start_with;
1390
1391                         /* root starts the serial with 1 */
1392                         if (show_serial)
1393                         {
1394                                 sprintf(serial_str, "%d", (*serial)++);
1395                                 if (show_branch)
1396                                         values[4] = serial_str;
1397                                 else
1398                                         values[3] = serial_str;
1399                         }
1400
1401                         /* construct the tuple */
1402                         tuple = BuildTupleFromCStrings(attinmeta, values);
1403
1404                         /* switch to long lived context while storing the tuple */
1405                         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1406
1407                         /* now store it */
1408                         tuplestore_puttuple(tupstore, tuple);
1409
1410                         /* now reset the context */
1411                         MemoryContextSwitchTo(oldcontext);
1412
1413                         /* increment level */
1414                         level++;
1415                 }
1416
1417                 for (i = 0; i < proc; i++)
1418                 {
1419                         /* initialize branch for this pass */
1420                         appendStringInfo(branchstr, "%s", branch);
1421                         appendStringInfo(chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
1422
1423                         /* get the next sql result tuple */
1424                         spi_tuple = tuptable->vals[i];
1425
1426                         /* get the current key and parent */
1427                         current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
1428                         appendStringInfo(chk_current_key, "%s%s%s", branch_delim, current_key, branch_delim);
1429                         current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
1430
1431                         /* get the current level */
1432                         sprintf(current_level, "%d", level);
1433
1434                         /* check to see if this key is also an ancestor */
1435                         if (strstr(chk_branchstr->data, chk_current_key->data))
1436                                 elog(ERROR, "infinite recursion detected");
1437
1438                         /* OK, extend the branch */
1439                         appendStringInfo(branchstr, "%s%s", branch_delim, current_key);
1440                         current_branch = branchstr->data;
1441
1442                         /* build a tuple */
1443                         values[0] = pstrdup(current_key);
1444                         values[1] = current_key_parent;
1445                         values[2] = current_level;
1446                         if (show_branch)
1447                                 values[3] = current_branch;
1448                         if (show_serial)
1449                         {
1450                                 sprintf(serial_str, "%d", (*serial)++);
1451                                 if (show_branch)
1452                                         values[4] = serial_str;
1453                                 else
1454                                         values[3] = serial_str;
1455                         }
1456
1457                         tuple = BuildTupleFromCStrings(attinmeta, values);
1458
1459                         xpfree(current_key);
1460                         xpfree(current_key_parent);
1461
1462                         /* switch to long lived context while storing the tuple */
1463                         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1464
1465                         /* store the tuple for later use */
1466                         tuplestore_puttuple(tupstore, tuple);
1467
1468                         /* now reset the context */
1469                         MemoryContextSwitchTo(oldcontext);
1470
1471                         heap_freetuple(tuple);
1472
1473                         /* recurse using current_key_parent as the new start_with */
1474                         tupstore = build_tuplestore_recursively(key_fld,
1475                                                                                                         parent_key_fld,
1476                                                                                                         relname,
1477                                                                                                         orderby_fld,
1478                                                                                                         branch_delim,
1479                                                                                                         values[0],
1480                                                                                                         current_branch,
1481                                                                                                         level + 1,
1482                                                                                                         serial,
1483                                                                                                         max_depth,
1484                                                                                                         show_branch,
1485                                                                                                         show_serial,
1486                                                                                                         per_query_ctx,
1487                                                                                                         attinmeta,
1488                                                                                                         tupstore);
1489
1490                         /* reset branch for next pass */
1491                         xpfree(branchstr->data);
1492                         initStringInfo(branchstr);
1493
1494                         xpfree(chk_branchstr->data);
1495                         initStringInfo(chk_branchstr);
1496
1497                         xpfree(chk_current_key->data);
1498                         initStringInfo(chk_current_key);
1499                 }
1500         }
1501
1502         return tupstore;
1503 }
1504
1505 /*
1506  * Check expected (query runtime) tupdesc suitable for Connectby
1507  */
1508 static void
1509 validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial)
1510 {
1511         int                     serial_column = 0;
1512
1513         if (show_serial)
1514                 serial_column = 1;
1515
1516         /* are there the correct number of columns */
1517         if (show_branch)
1518         {
1519                 if (tupdesc->natts != (CONNECTBY_NCOLS + serial_column))
1520                         ereport(ERROR,
1521                                         (errcode(ERRCODE_SYNTAX_ERROR),
1522                                          errmsg("invalid return type"),
1523                                          errdetail("Query-specified return tuple has " \
1524                                                            "wrong number of columns.")));
1525         }
1526         else
1527         {
1528                 if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH + serial_column)
1529                         ereport(ERROR,
1530                                         (errcode(ERRCODE_SYNTAX_ERROR),
1531                                          errmsg("invalid return type"),
1532                                          errdetail("Query-specified return tuple has " \
1533                                                            "wrong number of columns.")));
1534         }
1535
1536         /* check that the types of the first two columns match */
1537         if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
1538                 ereport(ERROR,
1539                                 (errcode(ERRCODE_SYNTAX_ERROR),
1540                                  errmsg("invalid return type"),
1541                                  errdetail("First two columns must be the same type.")));
1542
1543         /* check that the type of the third column is INT4 */
1544         if (tupdesc->attrs[2]->atttypid != INT4OID)
1545                 ereport(ERROR,
1546                                 (errcode(ERRCODE_SYNTAX_ERROR),
1547                                  errmsg("invalid return type"),
1548                                  errdetail("Third column must be type %s.",
1549                                                    format_type_be(INT4OID))));
1550
1551         /* check that the type of the fourth column is TEXT if applicable */
1552         if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
1553                 ereport(ERROR,
1554                                 (errcode(ERRCODE_SYNTAX_ERROR),
1555                                  errmsg("invalid return type"),
1556                                  errdetail("Fourth column must be type %s.",
1557                                                    format_type_be(TEXTOID))));
1558
1559         /* check that the type of the fifth column is INT4 */
1560         if (show_branch && show_serial && tupdesc->attrs[4]->atttypid != INT4OID)
1561                 elog(ERROR, "Query-specified return tuple not valid for Connectby: "
1562                          "fifth column must be type %s", format_type_be(INT4OID));
1563
1564         /* check that the type of the fifth column is INT4 */
1565         if (!show_branch && show_serial && tupdesc->attrs[3]->atttypid != INT4OID)
1566                 elog(ERROR, "Query-specified return tuple not valid for Connectby: "
1567                          "fourth column must be type %s", format_type_be(INT4OID));
1568
1569         /* OK, the tupdesc is valid for our purposes */
1570 }
1571
1572 /*
1573  * Check if spi sql tupdesc and return tupdesc are compatible
1574  */
1575 static bool
1576 compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1577 {
1578         Oid                     ret_atttypid;
1579         Oid                     sql_atttypid;
1580
1581         /* check the key_fld types match */
1582         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1583         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1584         if (ret_atttypid != sql_atttypid)
1585                 ereport(ERROR,
1586                                 (errcode(ERRCODE_SYNTAX_ERROR),
1587                                  errmsg("invalid return type"),
1588                                  errdetail("SQL key field datatype does " \
1589                                                    "not match return key field datatype.")));
1590
1591         /* check the parent_key_fld types match */
1592         ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
1593         sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
1594         if (ret_atttypid != sql_atttypid)
1595                 ereport(ERROR,
1596                                 (errcode(ERRCODE_SYNTAX_ERROR),
1597                                  errmsg("invalid return type"),
1598                                  errdetail("SQL parent key field datatype does " \
1599                                                 "not match return parent key field datatype.")));
1600
1601         /* OK, the two tupdescs are compatible for our purposes */
1602         return true;
1603 }
1604
1605 /*
1606  * Check if two tupdescs match in type of attributes
1607  */
1608 static bool
1609 compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1610 {
1611         int                     i;
1612         Form_pg_attribute ret_attr;
1613         Oid                     ret_atttypid;
1614         Form_pg_attribute sql_attr;
1615         Oid                     sql_atttypid;
1616
1617         /* check the rowid types match */
1618         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1619         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1620         if (ret_atttypid != sql_atttypid)
1621                 ereport(ERROR,
1622                                 (errcode(ERRCODE_SYNTAX_ERROR),
1623                                  errmsg("invalid return type"),
1624                                  errdetail("SQL rowid datatype does not match " \
1625                                                    "return rowid datatype.")));
1626
1627         /*
1628          * - attribute [1] of the sql tuple is the category; no need to check
1629          * it - attribute [2] of the sql tuple should match attributes [1] to
1630          * [natts] of the return tuple
1631          */
1632         sql_attr = sql_tupdesc->attrs[2];
1633         for (i = 1; i < ret_tupdesc->natts; i++)
1634         {
1635                 ret_attr = ret_tupdesc->attrs[i];
1636
1637                 if (ret_attr->atttypid != sql_attr->atttypid)
1638                         return false;
1639         }
1640
1641         /* OK, the two tupdescs are compatible for our purposes */
1642         return true;
1643 }
1644
1645 static TupleDesc
1646 make_crosstab_tupledesc(TupleDesc spi_tupdesc, int num_categories)
1647 {
1648         Form_pg_attribute sql_attr;
1649         Oid                     sql_atttypid;
1650         TupleDesc       tupdesc;
1651         int                     natts;
1652         AttrNumber      attnum;
1653         char            attname[NAMEDATALEN];
1654         int                     i;
1655
1656         /*
1657          * We need to build a tuple description with one column for the
1658          * rowname, and num_categories columns for the values. Each must be of
1659          * the same type as the corresponding spi result input column.
1660          */
1661         natts = num_categories + 1;
1662         tupdesc = CreateTemplateTupleDesc(natts, false);
1663
1664         /* first the rowname column */
1665         attnum = 1;
1666
1667         sql_attr = spi_tupdesc->attrs[0];
1668         sql_atttypid = sql_attr->atttypid;
1669
1670         strcpy(attname, "rowname");
1671
1672         TupleDescInitEntry(tupdesc, attnum, attname, sql_atttypid,
1673                                            -1, 0, false);
1674
1675         /* now the category values columns */
1676         sql_attr = spi_tupdesc->attrs[2];
1677         sql_atttypid = sql_attr->atttypid;
1678
1679         for (i = 0; i < num_categories; i++)
1680         {
1681                 attnum++;
1682
1683                 sprintf(attname, "category_%d", i + 1);
1684                 TupleDescInitEntry(tupdesc, attnum, attname, sql_atttypid,
1685                                                    -1, 0, false);
1686         }
1687
1688         return tupdesc;
1689 }