]> granicus.if.org Git - postgresql/blob - contrib/tablefunc/tablefunc.c
Update copyright for the year 2010.
[postgresql] / contrib / tablefunc / tablefunc.c
1 /*
2  * $PostgreSQL: pgsql/contrib/tablefunc/tablefunc.c,v 1.62 2010/01/02 16:57:32 momjian Exp $
3  *
4  *
5  * tablefunc
6  *
7  * Sample to demonstrate C functions which return setof scalar
8  * and setof composite.
9  * Joe Conway <mail@joeconway.com>
10  * And contributors:
11  * Nabil Sayegh <postgresql@e-trolley.de>
12  *
13  * Copyright (c) 2002-2010, PostgreSQL Global Development Group
14  *
15  * Permission to use, copy, modify, and distribute this software and its
16  * documentation for any purpose, without fee, and without a written agreement
17  * is hereby granted, provided that the above copyright notice and this
18  * paragraph and the following two paragraphs appear in all copies.
19  *
20  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  *
26  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
27  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
29  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31  *
32  */
33 #include "postgres.h"
34
35 #include <math.h>
36
37 #include "catalog/pg_type.h"
38 #include "fmgr.h"
39 #include "funcapi.h"
40 #include "executor/spi.h"
41 #include "lib/stringinfo.h"
42 #include "miscadmin.h"
43 #include "utils/builtins.h"
44 #include "utils/guc.h"
45 #include "utils/lsyscache.h"
46
47 #include "tablefunc.h"
48
49 PG_MODULE_MAGIC;
50
51 static HTAB *load_categories_hash(char *cats_sql, MemoryContext per_query_ctx);
52 static Tuplestorestate *get_crosstab_tuplestore(char *sql,
53                                                 HTAB *crosstab_hash,
54                                                 TupleDesc tupdesc,
55                                                 MemoryContext per_query_ctx,
56                                                 bool randomAccess);
57 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial);
58 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
59 static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
60 static void get_normal_pair(float8 *x1, float8 *x2);
61 static Tuplestorestate *connectby(char *relname,
62                   char *key_fld,
63                   char *parent_key_fld,
64                   char *orderby_fld,
65                   char *branch_delim,
66                   char *start_with,
67                   int max_depth,
68                   bool show_branch,
69                   bool show_serial,
70                   MemoryContext per_query_ctx,
71                   bool randomAccess,
72                   AttInMetadata *attinmeta);
73 static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
74                                                          char *parent_key_fld,
75                                                          char *relname,
76                                                          char *orderby_fld,
77                                                          char *branch_delim,
78                                                          char *start_with,
79                                                          char *branch,
80                                                          int level,
81                                                          int *serial,
82                                                          int max_depth,
83                                                          bool show_branch,
84                                                          bool show_serial,
85                                                          MemoryContext per_query_ctx,
86                                                          AttInMetadata *attinmeta,
87                                                          Tuplestorestate *tupstore);
88 static char *quote_literal_cstr(char *rawstr);
89
90 typedef struct
91 {
92         float8          mean;                   /* mean of the distribution */
93         float8          stddev;                 /* stddev of the distribution */
94         float8          carry_val;              /* hold second generated value */
95         bool            use_carry;              /* use second generated value */
96 } normal_rand_fctx;
97
98 #define xpfree(var_) \
99         do { \
100                 if (var_ != NULL) \
101                 { \
102                         pfree(var_); \
103                         var_ = NULL; \
104                 } \
105         } while (0)
106
107 #define xpstrdup(tgtvar_, srcvar_) \
108         do { \
109                 if (srcvar_) \
110                         tgtvar_ = pstrdup(srcvar_); \
111                 else \
112                         tgtvar_ = NULL; \
113         } while (0)
114
115 #define xstreq(tgtvar_, srcvar_) \
116         (((tgtvar_ == NULL) && (srcvar_ == NULL)) || \
117          ((tgtvar_ != NULL) && (srcvar_ != NULL) && (strcmp(tgtvar_, srcvar_) == 0)))
118
119 /* sign, 10 digits, '\0' */
120 #define INT32_STRLEN    12
121
122 /* stored info for a crosstab category */
123 typedef struct crosstab_cat_desc
124 {
125         char       *catname;            /* full category name */
126         int                     attidx;                 /* zero based */
127 } crosstab_cat_desc;
128
129 #define MAX_CATNAME_LEN                 NAMEDATALEN
130 #define INIT_CATS                               64
131
132 #define crosstab_HashTableLookup(HASHTAB, CATNAME, CATDESC) \
133 do { \
134         crosstab_HashEnt *hentry; char key[MAX_CATNAME_LEN]; \
135         \
136         MemSet(key, 0, MAX_CATNAME_LEN); \
137         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATNAME); \
138         hentry = (crosstab_HashEnt*) hash_search(HASHTAB, \
139                                                                                  key, HASH_FIND, NULL); \
140         if (hentry) \
141                 CATDESC = hentry->catdesc; \
142         else \
143                 CATDESC = NULL; \
144 } while(0)
145
146 #define crosstab_HashTableInsert(HASHTAB, CATDESC) \
147 do { \
148         crosstab_HashEnt *hentry; bool found; char key[MAX_CATNAME_LEN]; \
149         \
150         MemSet(key, 0, MAX_CATNAME_LEN); \
151         snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATDESC->catname); \
152         hentry = (crosstab_HashEnt*) hash_search(HASHTAB, \
153                                                                                  key, HASH_ENTER, &found); \
154         if (found) \
155                 ereport(ERROR, \
156                                 (errcode(ERRCODE_DUPLICATE_OBJECT), \
157                                  errmsg("duplicate category name"))); \
158         hentry->catdesc = CATDESC; \
159 } while(0)
160
161 /* hash table */
162 typedef struct crosstab_hashent
163 {
164         char            internal_catname[MAX_CATNAME_LEN];
165         crosstab_cat_desc *catdesc;
166 } crosstab_HashEnt;
167
168 /*
169  * normal_rand - return requested number of random values
170  * with a Gaussian (Normal) distribution.
171  *
172  * inputs are int numvals, float8 mean, and float8 stddev
173  * returns setof float8
174  */
175 PG_FUNCTION_INFO_V1(normal_rand);
176 Datum
177 normal_rand(PG_FUNCTION_ARGS)
178 {
179         FuncCallContext *funcctx;
180         int                     call_cntr;
181         int                     max_calls;
182         normal_rand_fctx *fctx;
183         float8          mean;
184         float8          stddev;
185         float8          carry_val;
186         bool            use_carry;
187         MemoryContext oldcontext;
188
189         /* stuff done only on the first call of the function */
190         if (SRF_IS_FIRSTCALL())
191         {
192                 /* create a function context for cross-call persistence */
193                 funcctx = SRF_FIRSTCALL_INIT();
194
195                 /*
196                  * switch to memory context appropriate for multiple function calls
197                  */
198                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
199
200                 /* total number of tuples to be returned */
201                 funcctx->max_calls = PG_GETARG_UINT32(0);
202
203                 /* allocate memory for user context */
204                 fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
205
206                 /*
207                  * Use fctx to keep track of upper and lower bounds from call to call.
208                  * It will also be used to carry over the spare value we get from the
209                  * Box-Muller algorithm so that we only actually calculate a new value
210                  * every other call.
211                  */
212                 fctx->mean = PG_GETARG_FLOAT8(1);
213                 fctx->stddev = PG_GETARG_FLOAT8(2);
214                 fctx->carry_val = 0;
215                 fctx->use_carry = false;
216
217                 funcctx->user_fctx = fctx;
218
219                 MemoryContextSwitchTo(oldcontext);
220         }
221
222         /* stuff done on every call of the function */
223         funcctx = SRF_PERCALL_SETUP();
224
225         call_cntr = funcctx->call_cntr;
226         max_calls = funcctx->max_calls;
227         fctx = funcctx->user_fctx;
228         mean = fctx->mean;
229         stddev = fctx->stddev;
230         carry_val = fctx->carry_val;
231         use_carry = fctx->use_carry;
232
233         if (call_cntr < max_calls)      /* do when there is more left to send */
234         {
235                 float8          result;
236
237                 if (use_carry)
238                 {
239                         /*
240                          * reset use_carry and use second value obtained on last pass
241                          */
242                         fctx->use_carry = false;
243                         result = carry_val;
244                 }
245                 else
246                 {
247                         float8          normval_1;
248                         float8          normval_2;
249
250                         /* Get the next two normal values */
251                         get_normal_pair(&normval_1, &normval_2);
252
253                         /* use the first */
254                         result = mean + (stddev * normval_1);
255
256                         /* and save the second */
257                         fctx->carry_val = mean + (stddev * normval_2);
258                         fctx->use_carry = true;
259                 }
260
261                 /* send the result */
262                 SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
263         }
264         else
265                 /* do when there is no more left */
266                 SRF_RETURN_DONE(funcctx);
267 }
268
269 /*
270  * get_normal_pair()
271  * Assigns normally distributed (Gaussian) values to a pair of provided
272  * parameters, with mean 0, standard deviation 1.
273  *
274  * This routine implements Algorithm P (Polar method for normal deviates)
275  * from Knuth's _The_Art_of_Computer_Programming_, Volume 2, 3rd ed., pages
276  * 122-126. Knuth cites his source as "The polar method", G. E. P. Box, M. E.
277  * Muller, and G. Marsaglia, _Annals_Math,_Stat._ 29 (1958), 610-611.
278  *
279  */
280 static void
281 get_normal_pair(float8 *x1, float8 *x2)
282 {
283         float8          u1,
284                                 u2,
285                                 v1,
286                                 v2,
287                                 s;
288
289         do
290         {
291                 u1 = (float8) random() / (float8) MAX_RANDOM_VALUE;
292                 u2 = (float8) random() / (float8) MAX_RANDOM_VALUE;
293
294                 v1 = (2.0 * u1) - 1.0;
295                 v2 = (2.0 * u2) - 1.0;
296
297                 s = v1 * v1 + v2 * v2;
298         } while (s >= 1.0);
299
300         if (s == 0)
301         {
302                 *x1 = 0;
303                 *x2 = 0;
304         }
305         else
306         {
307                 s = sqrt((-2.0 * log(s)) / s);
308                 *x1 = v1 * s;
309                 *x2 = v2 * s;
310         }
311 }
312
313 /*
314  * crosstab - create a crosstab of rowids and values columns from a
315  * SQL statement returning one rowid column, one category column,
316  * and one value column.
317  *
318  * e.g. given sql which produces:
319  *
320  *                      rowid   cat             value
321  *                      ------+-------+-------
322  *                      row1    cat1    val1
323  *                      row1    cat2    val2
324  *                      row1    cat3    val3
325  *                      row1    cat4    val4
326  *                      row2    cat1    val5
327  *                      row2    cat2    val6
328  *                      row2    cat3    val7
329  *                      row2    cat4    val8
330  *
331  * crosstab returns:
332  *                                      <===== values columns =====>
333  *                      rowid   cat1    cat2    cat3    cat4
334  *                      ------+-------+-------+-------+-------
335  *                      row1    val1    val2    val3    val4
336  *                      row2    val5    val6    val7    val8
337  *
338  * NOTES:
339  * 1. SQL result must be ordered by 1,2.
340  * 2. The number of values columns depends on the tuple description
341  *        of the function's declared return type.  The return type's columns
342  *        must match the datatypes of the SQL query's result.  The datatype
343  *        of the category column can be anything, however.
344  * 3. Missing values (i.e. not enough adjacent rows of same rowid to
345  *        fill the number of result values columns) are filled in with nulls.
346  * 4. Extra values (i.e. too many adjacent rows of same rowid to fill
347  *        the number of result values columns) are skipped.
348  * 5. Rows with all nulls in the values columns are skipped.
349  */
350 PG_FUNCTION_INFO_V1(crosstab);
351 Datum
352 crosstab(PG_FUNCTION_ARGS)
353 {
354         char       *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
355         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
356         Tuplestorestate *tupstore;
357         TupleDesc       tupdesc;
358         int                     call_cntr;
359         int                     max_calls;
360         AttInMetadata *attinmeta;
361         SPITupleTable *spi_tuptable;
362         TupleDesc       spi_tupdesc;
363         bool            firstpass;
364         char       *lastrowid;
365         int                     i;
366         int                     num_categories;
367         MemoryContext per_query_ctx;
368         MemoryContext oldcontext;
369         int                     ret;
370         int                     proc;
371
372         /* check to see if caller supports us returning a tuplestore */
373         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
374                 ereport(ERROR,
375                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
376                                  errmsg("set-valued function called in context that cannot accept a set")));
377         if (!(rsinfo->allowedModes & SFRM_Materialize))
378                 ereport(ERROR,
379                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
380                                  errmsg("materialize mode required, but it is not " \
381                                                 "allowed in this context")));
382
383         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
384
385         /* Connect to SPI manager */
386         if ((ret = SPI_connect()) < 0)
387                 /* internal error */
388                 elog(ERROR, "crosstab: SPI_connect returned %d", ret);
389
390         /* Retrieve the desired rows */
391         ret = SPI_execute(sql, true, 0);
392         proc = SPI_processed;
393
394         /* If no qualifying tuples, fall out early */
395         if (ret != SPI_OK_SELECT || proc <= 0)
396         {
397                 SPI_finish();
398                 rsinfo->isDone = ExprEndResult;
399                 PG_RETURN_NULL();
400         }
401
402         spi_tuptable = SPI_tuptable;
403         spi_tupdesc = spi_tuptable->tupdesc;
404
405         /*----------
406          * The provided SQL query must always return three columns.
407          *
408          * 1. rowname
409          *      the label or identifier for each row in the final result
410          * 2. category
411          *      the label or identifier for each column in the final result
412          * 3. values
413          *      the value for each column in the final result
414          *----------
415          */
416         if (spi_tupdesc->natts != 3)
417                 ereport(ERROR,
418                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
419                                  errmsg("invalid source data SQL statement"),
420                                  errdetail("The provided SQL must return 3 "
421                                                    "columns: rowid, category, and values.")));
422
423         /* get a tuple descriptor for our result type */
424         switch (get_call_result_type(fcinfo, NULL, &tupdesc))
425         {
426                 case TYPEFUNC_COMPOSITE:
427                         /* success */
428                         break;
429                 case TYPEFUNC_RECORD:
430                         /* failed to determine actual type of RECORD */
431                         ereport(ERROR,
432                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
433                                          errmsg("function returning record called in context "
434                                                         "that cannot accept type record")));
435                         break;
436                 default:
437                         /* result type isn't composite */
438                         elog(ERROR, "return type must be a row type");
439                         break;
440         }
441
442         /*
443          * Check that return tupdesc is compatible with the data we got from SPI,
444          * at least based on number and type of attributes
445          */
446         if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
447                 ereport(ERROR,
448                                 (errcode(ERRCODE_SYNTAX_ERROR),
449                                  errmsg("return and sql tuple descriptions are " \
450                                                 "incompatible")));
451
452         /*
453          * switch to long-lived memory context
454          */
455         oldcontext = MemoryContextSwitchTo(per_query_ctx);
456
457         /* make sure we have a persistent copy of the result tupdesc */
458         tupdesc = CreateTupleDescCopy(tupdesc);
459
460         /* initialize our tuplestore in long-lived context */
461         tupstore =
462                 tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
463                                                           false, work_mem);
464
465         MemoryContextSwitchTo(oldcontext);
466
467         /*
468          * Generate attribute metadata needed later to produce tuples from raw C
469          * strings
470          */
471         attinmeta = TupleDescGetAttInMetadata(tupdesc);
472
473         /* total number of tuples to be examined */
474         max_calls = proc;
475
476         /* the return tuple always must have 1 rowid + num_categories columns */
477         num_categories = tupdesc->natts - 1;
478
479         firstpass = true;
480         lastrowid = NULL;
481
482         for (call_cntr = 0; call_cntr < max_calls; call_cntr++)
483         {
484                 bool            skip_tuple = false;
485                 char      **values;
486
487                 /* allocate and zero space */
488                 values = (char **) palloc0((1 + num_categories) * sizeof(char *));
489
490                 /*
491                  * now loop through the sql results and assign each value in sequence
492                  * to the next category
493                  */
494                 for (i = 0; i < num_categories; i++)
495                 {
496                         HeapTuple       spi_tuple;
497                         char       *rowid;
498
499                         /* see if we've gone too far already */
500                         if (call_cntr >= max_calls)
501                                 break;
502
503                         /* get the next sql result tuple */
504                         spi_tuple = spi_tuptable->vals[call_cntr];
505
506                         /* get the rowid from the current sql result tuple */
507                         rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
508
509                         /*
510                          * If this is the first pass through the values for this rowid,
511                          * set the first column to rowid
512                          */
513                         if (i == 0)
514                         {
515                                 xpstrdup(values[0], rowid);
516
517                                 /*
518                                  * Check to see if the rowid is the same as that of the last
519                                  * tuple sent -- if so, skip this tuple entirely
520                                  */
521                                 if (!firstpass && xstreq(lastrowid, rowid))
522                                 {
523                                         xpfree(rowid);
524                                         skip_tuple = true;
525                                         break;
526                                 }
527                         }
528
529                         /*
530                          * If rowid hasn't changed on us, continue building the output
531                          * tuple.
532                          */
533                         if (xstreq(rowid, values[0]))
534                         {
535                                 /*
536                                  * Get the next category item value, which is always attribute
537                                  * number three.
538                                  *
539                                  * Be careful to assign the value to the array index based on
540                                  * which category we are presently processing.
541                                  */
542                                 values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
543
544                                 /*
545                                  * increment the counter since we consume a row for each
546                                  * category, but not for last pass because the outer loop will
547                                  * do that for us
548                                  */
549                                 if (i < (num_categories - 1))
550                                         call_cntr++;
551                                 xpfree(rowid);
552                         }
553                         else
554                         {
555                                 /*
556                                  * We'll fill in NULLs for the missing values, but we need to
557                                  * decrement the counter since this sql result row doesn't
558                                  * belong to the current output tuple.
559                                  */
560                                 call_cntr--;
561                                 xpfree(rowid);
562                                 break;
563                         }
564                 }
565
566                 if (!skip_tuple)
567                 {
568                         HeapTuple       tuple;
569
570                         /* build the tuple and store it */
571                         tuple = BuildTupleFromCStrings(attinmeta, values);
572                         tuplestore_puttuple(tupstore, tuple);
573                         heap_freetuple(tuple);
574                 }
575
576                 /* Remember current rowid */
577                 xpfree(lastrowid);
578                 xpstrdup(lastrowid, values[0]);
579                 firstpass = false;
580
581                 /* Clean up */
582                 for (i = 0; i < num_categories + 1; i++)
583                         if (values[i] != NULL)
584                                 pfree(values[i]);
585                 pfree(values);
586         }
587
588         /* let the caller know we're sending back a tuplestore */
589         rsinfo->returnMode = SFRM_Materialize;
590         rsinfo->setResult = tupstore;
591         rsinfo->setDesc = tupdesc;
592
593         /* release SPI related resources (and return to caller's context) */
594         SPI_finish();
595
596         return (Datum) 0;
597 }
598
599 /*
600  * crosstab_hash - reimplement crosstab as materialized function and
601  * properly deal with missing values (i.e. don't pack remaining
602  * values to the left)
603  *
604  * crosstab - create a crosstab of rowids and values columns from a
605  * SQL statement returning one rowid column, one category column,
606  * and one value column.
607  *
608  * e.g. given sql which produces:
609  *
610  *                      rowid   cat             value
611  *                      ------+-------+-------
612  *                      row1    cat1    val1
613  *                      row1    cat2    val2
614  *                      row1    cat4    val4
615  *                      row2    cat1    val5
616  *                      row2    cat2    val6
617  *                      row2    cat3    val7
618  *                      row2    cat4    val8
619  *
620  * crosstab returns:
621  *                                      <===== values columns =====>
622  *                      rowid   cat1    cat2    cat3    cat4
623  *                      ------+-------+-------+-------+-------
624  *                      row1    val1    val2    null    val4
625  *                      row2    val5    val6    val7    val8
626  *
627  * NOTES:
628  * 1. SQL result must be ordered by 1.
629  * 2. The number of values columns depends on the tuple description
630  *        of the function's declared return type.
631  * 3. Missing values (i.e. missing category) are filled in with nulls.
632  * 4. Extra values (i.e. not in category results) are skipped.
633  */
634 PG_FUNCTION_INFO_V1(crosstab_hash);
635 Datum
636 crosstab_hash(PG_FUNCTION_ARGS)
637 {
638         char       *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
639         char       *cats_sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
640         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
641         TupleDesc       tupdesc;
642         MemoryContext per_query_ctx;
643         MemoryContext oldcontext;
644         HTAB       *crosstab_hash;
645
646         /* check to see if caller supports us returning a tuplestore */
647         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
648                 ereport(ERROR,
649                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
650                                  errmsg("set-valued function called in context that cannot accept a set")));
651         if (!(rsinfo->allowedModes & SFRM_Materialize) ||
652                 rsinfo->expectedDesc == NULL)
653                 ereport(ERROR,
654                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
655                                  errmsg("materialize mode required, but it is not " \
656                                                 "allowed in this context")));
657
658         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
659         oldcontext = MemoryContextSwitchTo(per_query_ctx);
660
661         /* get the requested return tuple description */
662         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
663
664         /*
665          * Check to make sure we have a reasonable tuple descriptor
666          *
667          * Note we will attempt to coerce the values into whatever the return
668          * attribute type is and depend on the "in" function to complain if
669          * needed.
670          */
671         if (tupdesc->natts < 2)
672                 ereport(ERROR,
673                                 (errcode(ERRCODE_SYNTAX_ERROR),
674                                  errmsg("query-specified return tuple and " \
675                                                 "crosstab function are not compatible")));
676
677         /* load up the categories hash table */
678         crosstab_hash = load_categories_hash(cats_sql, per_query_ctx);
679
680         /* let the caller know we're sending back a tuplestore */
681         rsinfo->returnMode = SFRM_Materialize;
682
683         /* now go build it */
684         rsinfo->setResult = get_crosstab_tuplestore(sql,
685                                                                                                 crosstab_hash,
686                                                                                                 tupdesc,
687                                                                                                 per_query_ctx,
688                                                          rsinfo->allowedModes & SFRM_Materialize_Random);
689
690         /*
691          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
692          * tuples are in our tuplestore and passed back through rsinfo->setResult.
693          * rsinfo->setDesc is set to the tuple description that we actually used
694          * to build our tuples with, so the caller can verify we did what it was
695          * expecting.
696          */
697         rsinfo->setDesc = tupdesc;
698         MemoryContextSwitchTo(oldcontext);
699
700         return (Datum) 0;
701 }
702
703 /*
704  * load up the categories hash table
705  */
706 static HTAB *
707 load_categories_hash(char *cats_sql, MemoryContext per_query_ctx)
708 {
709         HTAB       *crosstab_hash;
710         HASHCTL         ctl;
711         int                     ret;
712         int                     proc;
713         MemoryContext SPIcontext;
714
715         /* initialize the category hash table */
716         MemSet(&ctl, 0, sizeof(ctl));
717         ctl.keysize = MAX_CATNAME_LEN;
718         ctl.entrysize = sizeof(crosstab_HashEnt);
719         ctl.hcxt = per_query_ctx;
720
721         /*
722          * use INIT_CATS, defined above as a guess of how many hash table entries
723          * to create, initially
724          */
725         crosstab_hash = hash_create("crosstab hash",
726                                                                 INIT_CATS,
727                                                                 &ctl,
728                                                                 HASH_ELEM | HASH_CONTEXT);
729
730         /* Connect to SPI manager */
731         if ((ret = SPI_connect()) < 0)
732                 /* internal error */
733                 elog(ERROR, "load_categories_hash: SPI_connect returned %d", ret);
734
735         /* Retrieve the category name rows */
736         ret = SPI_execute(cats_sql, true, 0);
737         proc = SPI_processed;
738
739         /* Check for qualifying tuples */
740         if ((ret == SPI_OK_SELECT) && (proc > 0))
741         {
742                 SPITupleTable *spi_tuptable = SPI_tuptable;
743                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
744                 int                     i;
745
746                 /*
747                  * The provided categories SQL query must always return one column:
748                  * category - the label or identifier for each column
749                  */
750                 if (spi_tupdesc->natts != 1)
751                         ereport(ERROR,
752                                         (errcode(ERRCODE_SYNTAX_ERROR),
753                                          errmsg("provided \"categories\" SQL must " \
754                                                         "return 1 column of at least one row")));
755
756                 for (i = 0; i < proc; i++)
757                 {
758                         crosstab_cat_desc *catdesc;
759                         char       *catname;
760                         HeapTuple       spi_tuple;
761
762                         /* get the next sql result tuple */
763                         spi_tuple = spi_tuptable->vals[i];
764
765                         /* get the category from the current sql result tuple */
766                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
767
768                         SPIcontext = MemoryContextSwitchTo(per_query_ctx);
769
770                         catdesc = (crosstab_cat_desc *) palloc(sizeof(crosstab_cat_desc));
771                         catdesc->catname = catname;
772                         catdesc->attidx = i;
773
774                         /* Add the proc description block to the hashtable */
775                         crosstab_HashTableInsert(crosstab_hash, catdesc);
776
777                         MemoryContextSwitchTo(SPIcontext);
778                 }
779         }
780
781         if (SPI_finish() != SPI_OK_FINISH)
782                 /* internal error */
783                 elog(ERROR, "load_categories_hash: SPI_finish() failed");
784
785         return crosstab_hash;
786 }
787
788 /*
789  * create and populate the crosstab tuplestore using the provided source query
790  */
791 static Tuplestorestate *
792 get_crosstab_tuplestore(char *sql,
793                                                 HTAB *crosstab_hash,
794                                                 TupleDesc tupdesc,
795                                                 MemoryContext per_query_ctx,
796                                                 bool randomAccess)
797 {
798         Tuplestorestate *tupstore;
799         int                     num_categories = hash_get_num_entries(crosstab_hash);
800         AttInMetadata *attinmeta = TupleDescGetAttInMetadata(tupdesc);
801         char      **values;
802         HeapTuple       tuple;
803         int                     ret;
804         int                     proc;
805
806         /* initialize our tuplestore (while still in query context!) */
807         tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
808
809         /* Connect to SPI manager */
810         if ((ret = SPI_connect()) < 0)
811                 /* internal error */
812                 elog(ERROR, "get_crosstab_tuplestore: SPI_connect returned %d", ret);
813
814         /* Now retrieve the crosstab source rows */
815         ret = SPI_execute(sql, true, 0);
816         proc = SPI_processed;
817
818         /* Check for qualifying tuples */
819         if ((ret == SPI_OK_SELECT) && (proc > 0))
820         {
821                 SPITupleTable *spi_tuptable = SPI_tuptable;
822                 TupleDesc       spi_tupdesc = spi_tuptable->tupdesc;
823                 int                     ncols = spi_tupdesc->natts;
824                 char       *rowid;
825                 char       *lastrowid = NULL;
826                 bool            firstpass = true;
827                 int                     i,
828                                         j;
829                 int                     result_ncols;
830
831                 if (num_categories == 0)
832                 {
833                         /* no qualifying category tuples */
834                         ereport(ERROR,
835                                         (errcode(ERRCODE_SYNTAX_ERROR),
836                                          errmsg("provided \"categories\" SQL must " \
837                                                         "return 1 column of at least one row")));
838                 }
839
840                 /*
841                  * The provided SQL query must always return at least three columns:
842                  *
843                  * 1. rowname   the label for each row - column 1 in the final result
844                  * 2. category  the label for each value-column in the final result 3.
845                  * value         the values used to populate the value-columns
846                  *
847                  * If there are more than three columns, the last two are taken as
848                  * "category" and "values". The first column is taken as "rowname".
849                  * Additional columns (2 thru N-2) are assumed the same for the same
850                  * "rowname", and are copied into the result tuple from the first time
851                  * we encounter a particular rowname.
852                  */
853                 if (ncols < 3)
854                         ereport(ERROR,
855                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
856                                          errmsg("invalid source data SQL statement"),
857                                          errdetail("The provided SQL must return 3 " \
858                                                            " columns; rowid, category, and values.")));
859
860                 result_ncols = (ncols - 2) + num_categories;
861
862                 /* Recheck to make sure we tuple descriptor still looks reasonable */
863                 if (tupdesc->natts != result_ncols)
864                         ereport(ERROR,
865                                         (errcode(ERRCODE_SYNTAX_ERROR),
866                                          errmsg("invalid return type"),
867                                          errdetail("Query-specified return " \
868                                                            "tuple has %d columns but crosstab " \
869                                                            "returns %d.", tupdesc->natts, result_ncols)));
870
871                 /* allocate space */
872                 values = (char **) palloc(result_ncols * sizeof(char *));
873
874                 /* and make sure it's clear */
875                 memset(values, '\0', result_ncols * sizeof(char *));
876
877                 for (i = 0; i < proc; i++)
878                 {
879                         HeapTuple       spi_tuple;
880                         crosstab_cat_desc *catdesc;
881                         char       *catname;
882
883                         /* get the next sql result tuple */
884                         spi_tuple = spi_tuptable->vals[i];
885
886                         /* get the rowid from the current sql result tuple */
887                         rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
888
889                         /*
890                          * if we're on a new output row, grab the column values up to
891                          * column N-2 now
892                          */
893                         if (firstpass || !xstreq(lastrowid, rowid))
894                         {
895                                 /*
896                                  * a new row means we need to flush the old one first, unless
897                                  * we're on the very first row
898                                  */
899                                 if (!firstpass)
900                                 {
901                                         /* rowid changed, flush the previous output row */
902                                         tuple = BuildTupleFromCStrings(attinmeta, values);
903
904                                         tuplestore_puttuple(tupstore, tuple);
905
906                                         for (j = 0; j < result_ncols; j++)
907                                                 xpfree(values[j]);
908                                 }
909
910                                 values[0] = rowid;
911                                 for (j = 1; j < ncols - 2; j++)
912                                         values[j] = SPI_getvalue(spi_tuple, spi_tupdesc, j + 1);
913
914                                 /* we're no longer on the first pass */
915                                 firstpass = false;
916                         }
917
918                         /* look up the category and fill in the appropriate column */
919                         catname = SPI_getvalue(spi_tuple, spi_tupdesc, ncols - 1);
920
921                         if (catname != NULL)
922                         {
923                                 crosstab_HashTableLookup(crosstab_hash, catname, catdesc);
924
925                                 if (catdesc)
926                                         values[catdesc->attidx + ncols - 2] =
927                                                 SPI_getvalue(spi_tuple, spi_tupdesc, ncols);
928                         }
929
930                         xpfree(lastrowid);
931                         xpstrdup(lastrowid, rowid);
932                 }
933
934                 /* flush the last output row */
935                 tuple = BuildTupleFromCStrings(attinmeta, values);
936
937                 tuplestore_puttuple(tupstore, tuple);
938         }
939
940         if (SPI_finish() != SPI_OK_FINISH)
941                 /* internal error */
942                 elog(ERROR, "get_crosstab_tuplestore: SPI_finish() failed");
943
944         tuplestore_donestoring(tupstore);
945
946         return tupstore;
947 }
948
949 /*
950  * connectby_text - produce a result set from a hierarchical (parent/child)
951  * table.
952  *
953  * e.g. given table foo:
954  *
955  *                      keyid   parent_keyid pos
956  *                      ------+------------+--
957  *                      row1    NULL             0
958  *                      row2    row1             0
959  *                      row3    row1             0
960  *                      row4    row2             1
961  *                      row5    row2             0
962  *                      row6    row4             0
963  *                      row7    row3             0
964  *                      row8    row6             0
965  *                      row9    row5             0
966  *
967  *
968  * connectby(text relname, text keyid_fld, text parent_keyid_fld
969  *                        [, text orderby_fld], text start_with, int max_depth
970  *                        [, text branch_delim])
971  * connectby('foo', 'keyid', 'parent_keyid', 'pos', 'row2', 0, '~') returns:
972  *
973  *              keyid   parent_id       level    branch                         serial
974  *              ------+-----------+--------+-----------------------
975  *              row2    NULL              0               row2                            1
976  *              row5    row2              1               row2~row5                       2
977  *              row9    row5              2               row2~row5~row9          3
978  *              row4    row2              1               row2~row4                       4
979  *              row6    row4              2               row2~row4~row6          5
980  *              row8    row6              3               row2~row4~row6~row8 6
981  *
982  */
983 PG_FUNCTION_INFO_V1(connectby_text);
984
985 #define CONNECTBY_NCOLS                                 4
986 #define CONNECTBY_NCOLS_NOBRANCH                3
987
988 Datum
989 connectby_text(PG_FUNCTION_ARGS)
990 {
991         char       *relname = text_to_cstring(PG_GETARG_TEXT_PP(0));
992         char       *key_fld = text_to_cstring(PG_GETARG_TEXT_PP(1));
993         char       *parent_key_fld = text_to_cstring(PG_GETARG_TEXT_PP(2));
994         char       *start_with = text_to_cstring(PG_GETARG_TEXT_PP(3));
995         int                     max_depth = PG_GETARG_INT32(4);
996         char       *branch_delim = NULL;
997         bool            show_branch = false;
998         bool            show_serial = false;
999         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1000         TupleDesc       tupdesc;
1001         AttInMetadata *attinmeta;
1002         MemoryContext per_query_ctx;
1003         MemoryContext oldcontext;
1004
1005         /* check to see if caller supports us returning a tuplestore */
1006         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1007                 ereport(ERROR,
1008                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1009                                  errmsg("set-valued function called in context that cannot accept a set")));
1010         if (!(rsinfo->allowedModes & SFRM_Materialize) ||
1011                 rsinfo->expectedDesc == NULL)
1012                 ereport(ERROR,
1013                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1014                                  errmsg("materialize mode required, but it is not " \
1015                                                 "allowed in this context")));
1016
1017         if (fcinfo->nargs == 6)
1018         {
1019                 branch_delim = text_to_cstring(PG_GETARG_TEXT_PP(5));
1020                 show_branch = true;
1021         }
1022         else
1023                 /* default is no show, tilde for the delimiter */
1024                 branch_delim = pstrdup("~");
1025
1026         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1027         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1028
1029         /* get the requested return tuple description */
1030         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1031
1032         /* does it meet our needs */
1033         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1034
1035         /* OK, use it then */
1036         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1037
1038         /* OK, go to work */
1039         rsinfo->returnMode = SFRM_Materialize;
1040         rsinfo->setResult = connectby(relname,
1041                                                                   key_fld,
1042                                                                   parent_key_fld,
1043                                                                   NULL,
1044                                                                   branch_delim,
1045                                                                   start_with,
1046                                                                   max_depth,
1047                                                                   show_branch,
1048                                                                   show_serial,
1049                                                                   per_query_ctx,
1050                                                           rsinfo->allowedModes & SFRM_Materialize_Random,
1051                                                                   attinmeta);
1052         rsinfo->setDesc = tupdesc;
1053
1054         MemoryContextSwitchTo(oldcontext);
1055
1056         /*
1057          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1058          * tuples are in our tuplestore and passed back through rsinfo->setResult.
1059          * rsinfo->setDesc is set to the tuple description that we actually used
1060          * to build our tuples with, so the caller can verify we did what it was
1061          * expecting.
1062          */
1063         return (Datum) 0;
1064 }
1065
1066 PG_FUNCTION_INFO_V1(connectby_text_serial);
1067 Datum
1068 connectby_text_serial(PG_FUNCTION_ARGS)
1069 {
1070         char       *relname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1071         char       *key_fld = text_to_cstring(PG_GETARG_TEXT_PP(1));
1072         char       *parent_key_fld = text_to_cstring(PG_GETARG_TEXT_PP(2));
1073         char       *orderby_fld = text_to_cstring(PG_GETARG_TEXT_PP(3));
1074         char       *start_with = text_to_cstring(PG_GETARG_TEXT_PP(4));
1075         int                     max_depth = PG_GETARG_INT32(5);
1076         char       *branch_delim = NULL;
1077         bool            show_branch = false;
1078         bool            show_serial = true;
1079         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1080         TupleDesc       tupdesc;
1081         AttInMetadata *attinmeta;
1082         MemoryContext per_query_ctx;
1083         MemoryContext oldcontext;
1084
1085         /* check to see if caller supports us returning a tuplestore */
1086         if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1087                 ereport(ERROR,
1088                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1089                                  errmsg("set-valued function called in context that cannot accept a set")));
1090         if (!(rsinfo->allowedModes & SFRM_Materialize) ||
1091                 rsinfo->expectedDesc == NULL)
1092                 ereport(ERROR,
1093                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1094                                  errmsg("materialize mode required, but it is not " \
1095                                                 "allowed in this context")));
1096
1097         if (fcinfo->nargs == 7)
1098         {
1099                 branch_delim = text_to_cstring(PG_GETARG_TEXT_PP(6));
1100                 show_branch = true;
1101         }
1102         else
1103                 /* default is no show, tilde for the delimiter */
1104                 branch_delim = pstrdup("~");
1105
1106         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1107         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1108
1109         /* get the requested return tuple description */
1110         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
1111
1112         /* does it meet our needs */
1113         validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
1114
1115         /* OK, use it then */
1116         attinmeta = TupleDescGetAttInMetadata(tupdesc);
1117
1118         /* OK, go to work */
1119         rsinfo->returnMode = SFRM_Materialize;
1120         rsinfo->setResult = connectby(relname,
1121                                                                   key_fld,
1122                                                                   parent_key_fld,
1123                                                                   orderby_fld,
1124                                                                   branch_delim,
1125                                                                   start_with,
1126                                                                   max_depth,
1127                                                                   show_branch,
1128                                                                   show_serial,
1129                                                                   per_query_ctx,
1130                                                           rsinfo->allowedModes & SFRM_Materialize_Random,
1131                                                                   attinmeta);
1132         rsinfo->setDesc = tupdesc;
1133
1134         MemoryContextSwitchTo(oldcontext);
1135
1136         /*
1137          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
1138          * tuples are in our tuplestore and passed back through rsinfo->setResult.
1139          * rsinfo->setDesc is set to the tuple description that we actually used
1140          * to build our tuples with, so the caller can verify we did what it was
1141          * expecting.
1142          */
1143         return (Datum) 0;
1144 }
1145
1146
1147 /*
1148  * connectby - does the real work for connectby_text()
1149  */
1150 static Tuplestorestate *
1151 connectby(char *relname,
1152                   char *key_fld,
1153                   char *parent_key_fld,
1154                   char *orderby_fld,
1155                   char *branch_delim,
1156                   char *start_with,
1157                   int max_depth,
1158                   bool show_branch,
1159                   bool show_serial,
1160                   MemoryContext per_query_ctx,
1161                   bool randomAccess,
1162                   AttInMetadata *attinmeta)
1163 {
1164         Tuplestorestate *tupstore = NULL;
1165         int                     ret;
1166         MemoryContext oldcontext;
1167
1168         int                     serial = 1;
1169
1170         /* Connect to SPI manager */
1171         if ((ret = SPI_connect()) < 0)
1172                 /* internal error */
1173                 elog(ERROR, "connectby: SPI_connect returned %d", ret);
1174
1175         /* switch to longer term context to create the tuple store */
1176         oldcontext = MemoryContextSwitchTo(per_query_ctx);
1177
1178         /* initialize our tuplestore */
1179         tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
1180
1181         MemoryContextSwitchTo(oldcontext);
1182
1183         /* now go get the whole tree */
1184         tupstore = build_tuplestore_recursively(key_fld,
1185                                                                                         parent_key_fld,
1186                                                                                         relname,
1187                                                                                         orderby_fld,
1188                                                                                         branch_delim,
1189                                                                                         start_with,
1190                                                                                         start_with, /* current_branch */
1191                                                                                         0,      /* initial level is 0 */
1192                                                                                         &serial,        /* initial serial is 1 */
1193                                                                                         max_depth,
1194                                                                                         show_branch,
1195                                                                                         show_serial,
1196                                                                                         per_query_ctx,
1197                                                                                         attinmeta,
1198                                                                                         tupstore);
1199
1200         SPI_finish();
1201
1202         return tupstore;
1203 }
1204
1205 static Tuplestorestate *
1206 build_tuplestore_recursively(char *key_fld,
1207                                                          char *parent_key_fld,
1208                                                          char *relname,
1209                                                          char *orderby_fld,
1210                                                          char *branch_delim,
1211                                                          char *start_with,
1212                                                          char *branch,
1213                                                          int level,
1214                                                          int *serial,
1215                                                          int max_depth,
1216                                                          bool show_branch,
1217                                                          bool show_serial,
1218                                                          MemoryContext per_query_ctx,
1219                                                          AttInMetadata *attinmeta,
1220                                                          Tuplestorestate *tupstore)
1221 {
1222         TupleDesc       tupdesc = attinmeta->tupdesc;
1223         int                     ret;
1224         int                     proc;
1225         int                     serial_column;
1226         StringInfoData sql;
1227         char      **values;
1228         char       *current_key;
1229         char       *current_key_parent;
1230         char            current_level[INT32_STRLEN];
1231         char            serial_str[INT32_STRLEN];
1232         char       *current_branch;
1233         HeapTuple       tuple;
1234
1235         if (max_depth > 0 && level > max_depth)
1236                 return tupstore;
1237
1238         initStringInfo(&sql);
1239
1240         /* Build initial sql statement */
1241         if (!show_serial)
1242         {
1243                 appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s",
1244                                                  key_fld,
1245                                                  parent_key_fld,
1246                                                  relname,
1247                                                  parent_key_fld,
1248                                                  quote_literal_cstr(start_with),
1249                                                  key_fld, key_fld, parent_key_fld);
1250                 serial_column = 0;
1251         }
1252         else
1253         {
1254                 appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s ORDER BY %s",
1255                                                  key_fld,
1256                                                  parent_key_fld,
1257                                                  relname,
1258                                                  parent_key_fld,
1259                                                  quote_literal_cstr(start_with),
1260                                                  key_fld, key_fld, parent_key_fld,
1261                                                  orderby_fld);
1262                 serial_column = 1;
1263         }
1264
1265         if (show_branch)
1266                 values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
1267         else
1268                 values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
1269
1270         /* First time through, do a little setup */
1271         if (level == 0)
1272         {
1273                 /* root value is the one we initially start with */
1274                 values[0] = start_with;
1275
1276                 /* root value has no parent */
1277                 values[1] = NULL;
1278
1279                 /* root level is 0 */
1280                 sprintf(current_level, "%d", level);
1281                 values[2] = current_level;
1282
1283                 /* root branch is just starting root value */
1284                 if (show_branch)
1285                         values[3] = start_with;
1286
1287                 /* root starts the serial with 1 */
1288                 if (show_serial)
1289                 {
1290                         sprintf(serial_str, "%d", (*serial)++);
1291                         if (show_branch)
1292                                 values[4] = serial_str;
1293                         else
1294                                 values[3] = serial_str;
1295                 }
1296
1297                 /* construct the tuple */
1298                 tuple = BuildTupleFromCStrings(attinmeta, values);
1299
1300                 /* now store it */
1301                 tuplestore_puttuple(tupstore, tuple);
1302
1303                 /* increment level */
1304                 level++;
1305         }
1306
1307         /* Retrieve the desired rows */
1308         ret = SPI_execute(sql.data, true, 0);
1309         proc = SPI_processed;
1310
1311         /* Check for qualifying tuples */
1312         if ((ret == SPI_OK_SELECT) && (proc > 0))
1313         {
1314                 HeapTuple       spi_tuple;
1315                 SPITupleTable *tuptable = SPI_tuptable;
1316                 TupleDesc       spi_tupdesc = tuptable->tupdesc;
1317                 int                     i;
1318                 StringInfoData branchstr;
1319                 StringInfoData chk_branchstr;
1320                 StringInfoData chk_current_key;
1321
1322                 /* First time through, do a little more setup */
1323                 if (level == 0)
1324                 {
1325                         /*
1326                          * Check that return tupdesc is compatible with the one we got
1327                          * from the query, but only at level 0 -- no need to check more
1328                          * than once
1329                          */
1330
1331                         if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
1332                                 ereport(ERROR,
1333                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1334                                                  errmsg("invalid return type"),
1335                                                  errdetail("Return and SQL tuple descriptions are " \
1336                                                                    "incompatible.")));
1337                 }
1338
1339                 initStringInfo(&branchstr);
1340                 initStringInfo(&chk_branchstr);
1341                 initStringInfo(&chk_current_key);
1342
1343                 for (i = 0; i < proc; i++)
1344                 {
1345                         /* initialize branch for this pass */
1346                         appendStringInfo(&branchstr, "%s", branch);
1347                         appendStringInfo(&chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
1348
1349                         /* get the next sql result tuple */
1350                         spi_tuple = tuptable->vals[i];
1351
1352                         /* get the current key and parent */
1353                         current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
1354                         appendStringInfo(&chk_current_key, "%s%s%s", branch_delim, current_key, branch_delim);
1355                         current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
1356
1357                         /* get the current level */
1358                         sprintf(current_level, "%d", level);
1359
1360                         /* check to see if this key is also an ancestor */
1361                         if (strstr(chk_branchstr.data, chk_current_key.data))
1362                                 elog(ERROR, "infinite recursion detected");
1363
1364                         /* OK, extend the branch */
1365                         appendStringInfo(&branchstr, "%s%s", branch_delim, current_key);
1366                         current_branch = branchstr.data;
1367
1368                         /* build a tuple */
1369                         values[0] = pstrdup(current_key);
1370                         values[1] = current_key_parent;
1371                         values[2] = current_level;
1372                         if (show_branch)
1373                                 values[3] = current_branch;
1374                         if (show_serial)
1375                         {
1376                                 sprintf(serial_str, "%d", (*serial)++);
1377                                 if (show_branch)
1378                                         values[4] = serial_str;
1379                                 else
1380                                         values[3] = serial_str;
1381                         }
1382
1383                         tuple = BuildTupleFromCStrings(attinmeta, values);
1384
1385                         xpfree(current_key);
1386                         xpfree(current_key_parent);
1387
1388                         /* store the tuple for later use */
1389                         tuplestore_puttuple(tupstore, tuple);
1390
1391                         heap_freetuple(tuple);
1392
1393                         /* recurse using current_key_parent as the new start_with */
1394                         tupstore = build_tuplestore_recursively(key_fld,
1395                                                                                                         parent_key_fld,
1396                                                                                                         relname,
1397                                                                                                         orderby_fld,
1398                                                                                                         branch_delim,
1399                                                                                                         values[0],
1400                                                                                                         current_branch,
1401                                                                                                         level + 1,
1402                                                                                                         serial,
1403                                                                                                         max_depth,
1404                                                                                                         show_branch,
1405                                                                                                         show_serial,
1406                                                                                                         per_query_ctx,
1407                                                                                                         attinmeta,
1408                                                                                                         tupstore);
1409
1410                         /* reset branch for next pass */
1411                         resetStringInfo(&branchstr);
1412                         resetStringInfo(&chk_branchstr);
1413                         resetStringInfo(&chk_current_key);
1414                 }
1415
1416                 xpfree(branchstr.data);
1417                 xpfree(chk_branchstr.data);
1418                 xpfree(chk_current_key.data);
1419         }
1420
1421         return tupstore;
1422 }
1423
1424 /*
1425  * Check expected (query runtime) tupdesc suitable for Connectby
1426  */
1427 static void
1428 validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial)
1429 {
1430         int                     serial_column = 0;
1431
1432         if (show_serial)
1433                 serial_column = 1;
1434
1435         /* are there the correct number of columns */
1436         if (show_branch)
1437         {
1438                 if (tupdesc->natts != (CONNECTBY_NCOLS + serial_column))
1439                         ereport(ERROR,
1440                                         (errcode(ERRCODE_SYNTAX_ERROR),
1441                                          errmsg("invalid return type"),
1442                                          errdetail("Query-specified return tuple has " \
1443                                                            "wrong number of columns.")));
1444         }
1445         else
1446         {
1447                 if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH + serial_column)
1448                         ereport(ERROR,
1449                                         (errcode(ERRCODE_SYNTAX_ERROR),
1450                                          errmsg("invalid return type"),
1451                                          errdetail("Query-specified return tuple has " \
1452                                                            "wrong number of columns.")));
1453         }
1454
1455         /* check that the types of the first two columns match */
1456         if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
1457                 ereport(ERROR,
1458                                 (errcode(ERRCODE_SYNTAX_ERROR),
1459                                  errmsg("invalid return type"),
1460                                  errdetail("First two columns must be the same type.")));
1461
1462         /* check that the type of the third column is INT4 */
1463         if (tupdesc->attrs[2]->atttypid != INT4OID)
1464                 ereport(ERROR,
1465                                 (errcode(ERRCODE_SYNTAX_ERROR),
1466                                  errmsg("invalid return type"),
1467                                  errdetail("Third column must be type %s.",
1468                                                    format_type_be(INT4OID))));
1469
1470         /* check that the type of the fourth column is TEXT if applicable */
1471         if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
1472                 ereport(ERROR,
1473                                 (errcode(ERRCODE_SYNTAX_ERROR),
1474                                  errmsg("invalid return type"),
1475                                  errdetail("Fourth column must be type %s.",
1476                                                    format_type_be(TEXTOID))));
1477
1478         /* check that the type of the fifth column is INT4 */
1479         if (show_branch && show_serial && tupdesc->attrs[4]->atttypid != INT4OID)
1480                 elog(ERROR, "query-specified return tuple not valid for Connectby: "
1481                          "fifth column must be type %s", format_type_be(INT4OID));
1482
1483         /* check that the type of the fifth column is INT4 */
1484         if (!show_branch && show_serial && tupdesc->attrs[3]->atttypid != INT4OID)
1485                 elog(ERROR, "query-specified return tuple not valid for Connectby: "
1486                          "fourth column must be type %s", format_type_be(INT4OID));
1487
1488         /* OK, the tupdesc is valid for our purposes */
1489 }
1490
1491 /*
1492  * Check if spi sql tupdesc and return tupdesc are compatible
1493  */
1494 static bool
1495 compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1496 {
1497         Oid                     ret_atttypid;
1498         Oid                     sql_atttypid;
1499
1500         /* check the key_fld types match */
1501         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1502         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1503         if (ret_atttypid != sql_atttypid)
1504                 ereport(ERROR,
1505                                 (errcode(ERRCODE_SYNTAX_ERROR),
1506                                  errmsg("invalid return type"),
1507                                  errdetail("SQL key field datatype does " \
1508                                                    "not match return key field datatype.")));
1509
1510         /* check the parent_key_fld types match */
1511         ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
1512         sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
1513         if (ret_atttypid != sql_atttypid)
1514                 ereport(ERROR,
1515                                 (errcode(ERRCODE_SYNTAX_ERROR),
1516                                  errmsg("invalid return type"),
1517                                  errdetail("SQL parent key field datatype does " \
1518                                                    "not match return parent key field datatype.")));
1519
1520         /* OK, the two tupdescs are compatible for our purposes */
1521         return true;
1522 }
1523
1524 /*
1525  * Check if two tupdescs match in type of attributes
1526  */
1527 static bool
1528 compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
1529 {
1530         int                     i;
1531         Form_pg_attribute ret_attr;
1532         Oid                     ret_atttypid;
1533         Form_pg_attribute sql_attr;
1534         Oid                     sql_atttypid;
1535
1536         if (ret_tupdesc->natts < 2 ||
1537                 sql_tupdesc->natts < 3)
1538                 return false;
1539
1540         /* check the rowid types match */
1541         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1542         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1543         if (ret_atttypid != sql_atttypid)
1544                 ereport(ERROR,
1545                                 (errcode(ERRCODE_SYNTAX_ERROR),
1546                                  errmsg("invalid return type"),
1547                                  errdetail("SQL rowid datatype does not match " \
1548                                                    "return rowid datatype.")));
1549
1550         /*
1551          * - attribute [1] of the sql tuple is the category; no need to check it -
1552          * attribute [2] of the sql tuple should match attributes [1] to [natts]
1553          * of the return tuple
1554          */
1555         sql_attr = sql_tupdesc->attrs[2];
1556         for (i = 1; i < ret_tupdesc->natts; i++)
1557         {
1558                 ret_attr = ret_tupdesc->attrs[i];
1559
1560                 if (ret_attr->atttypid != sql_attr->atttypid)
1561                         return false;
1562         }
1563
1564         /* OK, the two tupdescs are compatible for our purposes */
1565         return true;
1566 }
1567
1568 /*
1569  * Return a properly quoted literal value.
1570  * Uses quote_literal in quote.c
1571  */
1572 static char *
1573 quote_literal_cstr(char *rawstr)
1574 {
1575         text       *rawstr_text;
1576         text       *result_text;
1577         char       *result;
1578
1579         rawstr_text = cstring_to_text(rawstr);
1580         result_text = DatumGetTextP(DirectFunctionCall1(quote_literal,
1581                                                                                           PointerGetDatum(rawstr_text)));
1582         result = text_to_cstring(result_text);
1583
1584         return result;
1585 }