]> granicus.if.org Git - postgresql/blob - contrib/tablefunc/tablefunc.c
> Now I'm testing connectby() in the /contrib/tablefunc in 7.3b1, which would
[postgresql] / contrib / tablefunc / tablefunc.c
1 /*
2  * tablefunc
3  *
4  * Sample to demonstrate C functions which return setof scalar
5  * and setof composite.
6  * Joe Conway <mail@joeconway.com>
7  *
8  * Copyright 2002 by PostgreSQL Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and its
11  * documentation for any purpose, without fee, and without a written agreement
12  * is hereby granted, provided that the above copyright notice and this
13  * paragraph and the following two paragraphs appear in all copies.
14  *
15  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
16  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
17  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
18  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
19  * POSSIBILITY OF SUCH DAMAGE.
20  *
21  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
22  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
23  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
24  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
25  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
26  *
27  */
28 #include "postgres.h"
29
30 #include <math.h>
31
32 #include "fmgr.h"
33 #include "funcapi.h"
34 #include "executor/spi.h"
35 #include "miscadmin.h"
36 #include "utils/builtins.h"
37 #include "utils/guc.h"
38 #include "utils/lsyscache.h"
39
40 #include "tablefunc.h"
41
42 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch);
43 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
44 static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
45 static void get_normal_pair(float8 *x1, float8 *x2);
46 static TupleDesc make_crosstab_tupledesc(TupleDesc spi_tupdesc,
47                                                 int num_catagories);
48 static Tuplestorestate *connectby(char *relname,
49                   char *key_fld,
50                   char *parent_key_fld,
51                   char *branch_delim,
52                   char *start_with,
53                   int max_depth,
54                   bool show_branch,
55                   MemoryContext per_query_ctx,
56                   AttInMetadata *attinmeta);
57 static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
58                                                          char *parent_key_fld,
59                                                          char *relname,
60                                                          char *branch_delim,
61                                                          char *start_with,
62                                                          char *branch,
63                                                          int level,
64                                                          int max_depth,
65                                                          bool show_branch,
66                                                          MemoryContext per_query_ctx,
67                                                          AttInMetadata *attinmeta,
68                                                          Tuplestorestate *tupstore);
69 static char *quote_ident_cstr(char *rawstr);
70
71 typedef struct
72 {
73         float8          mean;                   /* mean of the distribution */
74         float8          stddev;                 /* stddev of the distribution */
75         float8          carry_val;              /* hold second generated value */
76         bool            use_carry;              /* use second generated value */
77 }       normal_rand_fctx;
78
79 typedef struct
80 {
81         SPITupleTable *spi_tuptable;    /* sql results from user query */
82         char       *lastrowid;          /* rowid of the last tuple sent */
83 }       crosstab_fctx;
84
85 #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
86 #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
87 #define xpfree(var_) \
88         do { \
89                 if (var_ != NULL) \
90                 { \
91                         pfree(var_); \
92                         var_ = NULL; \
93                 } \
94         } while (0)
95
96 /* sign, 10 digits, '\0' */
97 #define INT32_STRLEN    12
98
99 /*
100  * normal_rand - return requested number of random values
101  * with a Gaussian (Normal) distribution.
102  *
103  * inputs are int numvals, float8 lower_bound, and float8 upper_bound
104  * returns float8
105  */
106 PG_FUNCTION_INFO_V1(normal_rand);
107 Datum
108 normal_rand(PG_FUNCTION_ARGS)
109 {
110         FuncCallContext *funcctx;
111         int                     call_cntr;
112         int                     max_calls;
113         normal_rand_fctx *fctx;
114         float8          mean;
115         float8          stddev;
116         float8          carry_val;
117         bool            use_carry;
118         MemoryContext oldcontext;
119
120         /* stuff done only on the first call of the function */
121         if (SRF_IS_FIRSTCALL())
122         {
123                 /* create a function context for cross-call persistence */
124                 funcctx = SRF_FIRSTCALL_INIT();
125
126                 /*
127                  * switch to memory context appropriate for multiple function
128                  * calls
129                  */
130                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
131
132                 /* total number of tuples to be returned */
133                 funcctx->max_calls = PG_GETARG_UINT32(0);
134
135                 /* allocate memory for user context */
136                 fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
137
138                 /*
139                  * Use fctx to keep track of upper and lower bounds from call to
140                  * call. It will also be used to carry over the spare value we get
141                  * from the Box-Muller algorithm so that we only actually
142                  * calculate a new value every other call.
143                  */
144                 fctx->mean = PG_GETARG_FLOAT8(1);
145                 fctx->stddev = PG_GETARG_FLOAT8(2);
146                 fctx->carry_val = 0;
147                 fctx->use_carry = false;
148
149                 funcctx->user_fctx = fctx;
150
151                 /*
152                  * we might actually get passed a negative number, but for this
153                  * purpose it doesn't matter, just cast it as an unsigned value
154                  */
155                 srandom(PG_GETARG_UINT32(3));
156
157                 MemoryContextSwitchTo(oldcontext);
158         }
159
160         /* stuff done on every call of the function */
161         funcctx = SRF_PERCALL_SETUP();
162
163         call_cntr = funcctx->call_cntr;
164         max_calls = funcctx->max_calls;
165         fctx = funcctx->user_fctx;
166         mean = fctx->mean;
167         stddev = fctx->stddev;
168         carry_val = fctx->carry_val;
169         use_carry = fctx->use_carry;
170
171         if (call_cntr < max_calls)      /* do when there is more left to send */
172         {
173                 float8          result;
174
175                 if (use_carry)
176                 {
177                         /*
178                          * reset use_carry and use second value obtained on last pass
179                          */
180                         fctx->use_carry = false;
181                         result = carry_val;
182                 }
183                 else
184                 {
185                         float8          normval_1;
186                         float8          normval_2;
187
188                         /* Get the next two normal values */
189                         get_normal_pair(&normval_1, &normval_2);
190
191                         /* use the first */
192                         result = mean + (stddev * normval_1);
193
194                         /* and save the second */
195                         fctx->carry_val = mean + (stddev * normval_2);
196                         fctx->use_carry = true;
197                 }
198
199                 /* send the result */
200                 SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
201         }
202         else
203 /* do when there is no more left */
204                 SRF_RETURN_DONE(funcctx);
205 }
206
207 /*
208  * get_normal_pair()
209  * Assigns normally distributed (Gaussian) values to a pair of provided
210  * parameters, with mean 0, standard deviation 1.
211  *
212  * This routine implements Algorithm P (Polar method for normal deviates)
213  * from Knuth's _The_Art_of_Computer_Programming_, Volume 2, 3rd ed., pages
214  * 122-126. Knuth cites his source as "The polar method", G. E. P. Box, M. E.
215  * Muller, and G. Marsaglia, _Annals_Math,_Stat._ 29 (1958), 610-611.
216  *
217  */
218 static void
219 get_normal_pair(float8 *x1, float8 *x2)
220 {
221         float8          u1,
222                                 u2,
223                                 v1,
224                                 v2,
225                                 s;
226
227         for (;;)
228         {
229                 u1 = (float8) random() / (float8) RAND_MAX;
230                 u2 = (float8) random() / (float8) RAND_MAX;
231
232                 v1 = (2.0 * u1) - 1.0;
233                 v2 = (2.0 * u2) - 1.0;
234
235                 s = pow(v1, 2) + pow(v2, 2);
236
237                 if (s >= 1.0)
238                         continue;
239
240                 if (s == 0)
241                 {
242                         *x1 = 0;
243                         *x2 = 0;
244                 }
245                 else
246                 {
247                         *x1 = v1 * sqrt((-2.0 * log(s)) / s);
248                         *x2 = v2 * sqrt((-2.0 * log(s)) / s);
249                 }
250
251                 return;
252         }
253 }
254
255 /*
256  * crosstab - create a crosstab of rowids and values columns from a
257  * SQL statement returning one rowid column, one category column,
258  * and one value column.
259  *
260  * e.g. given sql which produces:
261  *
262  *                      rowid   cat             value
263  *                      ------+-------+-------
264  *                      row1    cat1    val1
265  *                      row1    cat2    val2
266  *                      row1    cat3    val3
267  *                      row1    cat4    val4
268  *                      row2    cat1    val5
269  *                      row2    cat2    val6
270  *                      row2    cat3    val7
271  *                      row2    cat4    val8
272  *
273  * crosstab returns:
274  *                                      <===== values columns =====>
275  *                      rowid   cat1    cat2    cat3    cat4
276  *                      ------+-------+-------+-------+-------
277  *                      row1    val1    val2    val3    val4
278  *                      row2    val5    val6    val7    val8
279  *
280  * NOTES:
281  * 1. SQL result must be ordered by 1,2.
282  * 2. The number of values columns depends on the tuple description
283  *        of the function's declared return type.
284  * 2. Missing values (i.e. not enough adjacent rows of same rowid to
285  *        fill the number of result values columns) are filled in with nulls.
286  * 3. Extra values (i.e. too many adjacent rows of same rowid to fill
287  *        the number of result values columns) are skipped.
288  * 4. Rows with all nulls in the values columns are skipped.
289  */
290 PG_FUNCTION_INFO_V1(crosstab);
291 Datum
292 crosstab(PG_FUNCTION_ARGS)
293 {
294         FuncCallContext *funcctx;
295         TupleDesc       ret_tupdesc;
296         int                     call_cntr;
297         int                     max_calls;
298         TupleTableSlot *slot;
299         AttInMetadata *attinmeta;
300         SPITupleTable *spi_tuptable = NULL;
301         TupleDesc       spi_tupdesc;
302         char       *lastrowid = NULL;
303         crosstab_fctx *fctx;
304         int                     i;
305         int                     num_categories;
306         MemoryContext oldcontext;
307
308         /* stuff done only on the first call of the function */
309         if (SRF_IS_FIRSTCALL())
310         {
311                 char       *sql = GET_STR(PG_GETARG_TEXT_P(0));
312                 Oid                     funcid = fcinfo->flinfo->fn_oid;
313                 Oid                     functypeid;
314                 char            functyptype;
315                 TupleDesc       tupdesc = NULL;
316                 int                     ret;
317                 int                     proc;
318
319                 /* create a function context for cross-call persistence */
320                 funcctx = SRF_FIRSTCALL_INIT();
321
322                 /*
323                  * switch to memory context appropriate for multiple function
324                  * calls
325                  */
326                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
327
328                 /* Connect to SPI manager */
329                 if ((ret = SPI_connect()) < 0)
330                         elog(ERROR, "crosstab: SPI_connect returned %d", ret);
331
332                 /* Retrieve the desired rows */
333                 ret = SPI_exec(sql, 0);
334                 proc = SPI_processed;
335
336                 /* Check for qualifying tuples */
337                 if ((ret == SPI_OK_SELECT) && (proc > 0))
338                 {
339                         spi_tuptable = SPI_tuptable;
340                         spi_tupdesc = spi_tuptable->tupdesc;
341
342                         /*
343                          * The provided SQL query must always return three columns.
344                          *
345                          * 1. rowname   the label or identifier for each row in the final
346                          * result 2. category  the label or identifier for each column
347                          * in the final result 3. values        the value for each column
348                          * in the final result
349                          */
350                         if (spi_tupdesc->natts != 3)
351                                 elog(ERROR, "crosstab: provided SQL must return 3 columns;"
352                                          " a rowid, a category, and a values column");
353                 }
354                 else
355                 {
356                         /* no qualifying tuples */
357                         SPI_finish();
358                         SRF_RETURN_DONE(funcctx);
359                 }
360
361                 /* SPI switches context on us, so reset it */
362                 MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
363
364                 /* get the typeid that represents our return type */
365                 functypeid = get_func_rettype(funcid);
366
367                 /* check typtype to see if we have a predetermined return type */
368                 functyptype = get_typtype(functypeid);
369
370                 if (functyptype == 'c')
371                 {
372                         /* Build a tuple description for a functypeid tuple */
373                         tupdesc = TypeGetTupleDesc(functypeid, NIL);
374                 }
375                 else if (functyptype == 'p' && functypeid == RECORDOID)
376                 {
377                         if (fcinfo->nargs != 2)
378                                 elog(ERROR, "Wrong number of arguments specified for function");
379                         else
380                         {
381                                 int                     num_catagories = PG_GETARG_INT32(1);
382
383                                 tupdesc = make_crosstab_tupledesc(spi_tupdesc, num_catagories);
384                         }
385                 }
386                 else if (functyptype == 'b')
387                         elog(ERROR, "Invalid kind of return type specified for function");
388                 else
389                         elog(ERROR, "Unknown kind of return type specified for function");
390
391                 /*
392                  * Check that return tupdesc is compatible with the one we got
393                  * from ret_relname, at least based on number and type of
394                  * attributes
395                  */
396                 if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
397                         elog(ERROR, "crosstab: return and sql tuple descriptions are"
398                                  " incompatible");
399
400                 /* allocate a slot for a tuple with this tupdesc */
401                 slot = TupleDescGetSlot(tupdesc);
402
403                 /* assign slot to function context */
404                 funcctx->slot = slot;
405
406                 /*
407                  * Generate attribute metadata needed later to produce tuples from
408                  * raw C strings
409                  */
410                 attinmeta = TupleDescGetAttInMetadata(tupdesc);
411                 funcctx->attinmeta = attinmeta;
412
413                 /* allocate memory for user context */
414                 fctx = (crosstab_fctx *) palloc(sizeof(crosstab_fctx));
415
416                 /*
417                  * Save spi data for use across calls
418                  */
419                 fctx->spi_tuptable = spi_tuptable;
420                 fctx->lastrowid = NULL;
421                 funcctx->user_fctx = fctx;
422
423                 /* total number of tuples to be returned */
424                 funcctx->max_calls = proc;
425
426                 MemoryContextSwitchTo(oldcontext);
427         }
428
429         /* stuff done on every call of the function */
430         funcctx = SRF_PERCALL_SETUP();
431
432         /*
433          * initialize per-call variables
434          */
435         call_cntr = funcctx->call_cntr;
436         max_calls = funcctx->max_calls;
437
438         /* return slot for our tuple */
439         slot = funcctx->slot;
440
441         /* user context info */
442         fctx = (crosstab_fctx *) funcctx->user_fctx;
443         lastrowid = fctx->lastrowid;
444         spi_tuptable = fctx->spi_tuptable;
445
446         /* the sql tuple */
447         spi_tupdesc = spi_tuptable->tupdesc;
448
449         /* attribute return type and return tuple description */
450         attinmeta = funcctx->attinmeta;
451         ret_tupdesc = attinmeta->tupdesc;
452
453         /* the return tuple always must have 1 rowid + num_categories columns */
454         num_categories = ret_tupdesc->natts - 1;
455
456         if (call_cntr < max_calls)      /* do when there is more left to send */
457         {
458                 HeapTuple       tuple;
459                 Datum           result;
460                 char      **values;
461                 bool            allnulls = true;
462
463                 while (true)
464                 {
465                         /* allocate space */
466                         values = (char **) palloc((1 + num_categories) * sizeof(char *));
467
468                         /* and make sure it's clear */
469                         memset(values, '\0', (1 + num_categories) * sizeof(char *));
470
471                         /*
472                          * now loop through the sql results and assign each value in
473                          * sequence to the next category
474                          */
475                         for (i = 0; i < num_categories; i++)
476                         {
477                                 HeapTuple       spi_tuple;
478                                 char       *rowid = NULL;
479
480                                 /* see if we've gone too far already */
481                                 if (call_cntr >= max_calls)
482                                         break;
483
484                                 /* get the next sql result tuple */
485                                 spi_tuple = spi_tuptable->vals[call_cntr];
486
487                                 /* get the rowid from the current sql result tuple */
488                                 rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
489
490                                 /*
491                                  * If this is the first pass through the values for this
492                                  * rowid set it, otherwise make sure it hasn't changed on
493                                  * us. Also check to see if the rowid is the same as that
494                                  * of the last tuple sent -- if so, skip this tuple
495                                  * entirely
496                                  */
497                                 if (i == 0)
498                                         values[0] = pstrdup(rowid);
499
500                                 if ((rowid != NULL) && (strcmp(rowid, values[0]) == 0))
501                                 {
502                                         if ((lastrowid != NULL) && (strcmp(rowid, lastrowid) == 0))
503                                                 break;
504                                         else if (allnulls == true)
505                                                 allnulls = false;
506
507                                         /*
508                                          * Get the next category item value, which is alway
509                                          * attribute number three.
510                                          *
511                                          * Be careful to sssign the value to the array index
512                                          * based on which category we are presently
513                                          * processing.
514                                          */
515                                         values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
516
517                                         /*
518                                          * increment the counter since we consume a row for
519                                          * each category, but not for last pass because the
520                                          * API will do that for us
521                                          */
522                                         if (i < (num_categories - 1))
523                                                 call_cntr = ++funcctx->call_cntr;
524                                 }
525                                 else
526                                 {
527                                         /*
528                                          * We'll fill in NULLs for the missing values, but we
529                                          * need to decrement the counter since this sql result
530                                          * row doesn't belong to the current output tuple.
531                                          */
532                                         call_cntr = --funcctx->call_cntr;
533                                         break;
534                                 }
535
536                                 if (rowid != NULL)
537                                         xpfree(rowid);
538                         }
539
540                         xpfree(fctx->lastrowid);
541
542                         if (values[0] != NULL)
543                         {
544                                 /*
545                                  * switch to memory context appropriate for multiple
546                                  * function calls
547                                  */
548                                 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
549
550                                 lastrowid = fctx->lastrowid = pstrdup(values[0]);
551                                 MemoryContextSwitchTo(oldcontext);
552                         }
553
554                         if (!allnulls)
555                         {
556                                 /* build the tuple */
557                                 tuple = BuildTupleFromCStrings(attinmeta, values);
558
559                                 /* make the tuple into a datum */
560                                 result = TupleGetDatum(slot, tuple);
561
562                                 /* Clean up */
563                                 for (i = 0; i < num_categories + 1; i++)
564                                         if (values[i] != NULL)
565                                                 xpfree(values[i]);
566                                 xpfree(values);
567
568                                 SRF_RETURN_NEXT(funcctx, result);
569                         }
570                         else
571                         {
572                                 /*
573                                  * Skipping this tuple entirely, but we need to advance
574                                  * the counter like the API would if we had returned one.
575                                  */
576                                 call_cntr = ++funcctx->call_cntr;
577
578                                 /* we'll start over at the top */
579                                 xpfree(values);
580
581                                 /* see if we've gone too far already */
582                                 if (call_cntr >= max_calls)
583                                 {
584                                         /* release SPI related resources */
585                                         SPI_finish();
586                                         SRF_RETURN_DONE(funcctx);
587                                 }
588                         }
589                 }
590         }
591         else
592 /* do when there is no more left */
593         {
594                 /* release SPI related resources */
595                 SPI_finish();
596                 SRF_RETURN_DONE(funcctx);
597         }
598 }
599
600 /*
601  * connectby_text - produce a result set from a hierarchical (parent/child)
602  * table.
603  *
604  * e.g. given table foo:
605  *
606  *                      keyid   parent_keyid
607  *                      ------+--------------
608  *                      row1    NULL
609  *                      row2    row1
610  *                      row3    row1
611  *                      row4    row2
612  *                      row5    row2
613  *                      row6    row4
614  *                      row7    row3
615  *                      row8    row6
616  *                      row9    row5
617  *
618  *
619  * connectby(text relname, text keyid_fld, text parent_keyid_fld,
620  *                                              text start_with, int max_depth [, text branch_delim])
621  * connectby('foo', 'keyid', 'parent_keyid', 'row2', 0, '~') returns:
622  *
623  *              keyid   parent_id       level    branch
624  *              ------+-----------+--------+-----------------------
625  *              row2    NULL              0               row2
626  *              row4    row2              1               row2~row4
627  *              row6    row4              2               row2~row4~row6
628  *              row8    row6              3               row2~row4~row6~row8
629  *              row5    row2              1               row2~row5
630  *              row9    row5              2               row2~row5~row9
631  *
632  */
633 PG_FUNCTION_INFO_V1(connectby_text);
634
635 #define CONNECTBY_NCOLS                                 4
636 #define CONNECTBY_NCOLS_NOBRANCH                3
637
638 Datum
639 connectby_text(PG_FUNCTION_ARGS)
640 {
641         char       *relname = GET_STR(PG_GETARG_TEXT_P(0));
642         char       *key_fld = GET_STR(PG_GETARG_TEXT_P(1));
643         char       *parent_key_fld = GET_STR(PG_GETARG_TEXT_P(2));
644         char       *start_with = GET_STR(PG_GETARG_TEXT_P(3));
645         int                     max_depth = PG_GETARG_INT32(4);
646         char       *branch_delim = NULL;
647         bool            show_branch = false;
648         ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
649         TupleDesc       tupdesc;
650         AttInMetadata *attinmeta;
651         MemoryContext per_query_ctx;
652         MemoryContext oldcontext;
653
654         if (fcinfo->nargs == 6)
655         {
656                 branch_delim = GET_STR(PG_GETARG_TEXT_P(5));
657                 show_branch = true;
658         }
659
660         per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
661         oldcontext = MemoryContextSwitchTo(per_query_ctx);
662
663         /* get the requested return tuple description */
664         tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
665
666         /* does it meet our needs */
667         validateConnectbyTupleDesc(tupdesc, show_branch);
668
669         /* OK, use it then */
670         attinmeta = TupleDescGetAttInMetadata(tupdesc);
671
672         /* check to see if caller supports us returning a tuplestore */
673         if (!rsinfo->allowedModes & SFRM_Materialize)
674                 elog(ERROR, "connectby requires Materialize mode, but it is not "
675                          "allowed in this context");
676
677         /* OK, go to work */
678         rsinfo->returnMode = SFRM_Materialize;
679         rsinfo->setResult = connectby(relname,
680                                                                   key_fld,
681                                                                   parent_key_fld,
682                                                                   branch_delim,
683                                                                   start_with,
684                                                                   max_depth,
685                                                                   show_branch,
686                                                                   per_query_ctx,
687                                                                   attinmeta);
688         rsinfo->setDesc = tupdesc;
689
690         MemoryContextSwitchTo(oldcontext);
691
692         /*
693          * SFRM_Materialize mode expects us to return a NULL Datum. The actual
694          * tuples are in our tuplestore and passed back through
695          * rsinfo->setResult. rsinfo->setDesc is set to the tuple description
696          * that we actually used to build our tuples with, so the caller can
697          * verify we did what it was expecting.
698          */
699         return (Datum) 0;
700 }
701
702 /*
703  * connectby - does the real work for connectby_text()
704  */
705 static Tuplestorestate *
706 connectby(char *relname,
707                   char *key_fld,
708                   char *parent_key_fld,
709                   char *branch_delim,
710                   char *start_with,
711                   int max_depth,
712                   bool show_branch,
713                   MemoryContext per_query_ctx,
714                   AttInMetadata *attinmeta)
715 {
716         Tuplestorestate *tupstore = NULL;
717         int                     ret;
718         MemoryContext oldcontext;
719
720         /* Connect to SPI manager */
721         if ((ret = SPI_connect()) < 0)
722                 elog(ERROR, "connectby: SPI_connect returned %d", ret);
723
724         /* switch to longer term context to create the tuple store */
725         oldcontext = MemoryContextSwitchTo(per_query_ctx);
726
727         /* initialize our tuplestore */
728         tupstore = tuplestore_begin_heap(true, SortMem);
729
730         MemoryContextSwitchTo(oldcontext);
731
732         /* now go get the whole tree */
733         tupstore = build_tuplestore_recursively(key_fld,
734                                                                                         parent_key_fld,
735                                                                                         relname,
736                                                                                         branch_delim,
737                                                                                         start_with,
738                                                                                         start_with, /* current_branch */
739                                                                                         0,      /* initial level is 0 */
740                                                                                         max_depth,
741                                                                                         show_branch,
742                                                                                         per_query_ctx,
743                                                                                         attinmeta,
744                                                                                         tupstore);
745
746         SPI_finish();
747
748         oldcontext = MemoryContextSwitchTo(per_query_ctx);
749         tuplestore_donestoring(tupstore);
750         MemoryContextSwitchTo(oldcontext);
751
752         return tupstore;
753 }
754
755 static Tuplestorestate *
756 build_tuplestore_recursively(char *key_fld,
757                                                          char *parent_key_fld,
758                                                          char *relname,
759                                                          char *branch_delim,
760                                                          char *start_with,
761                                                          char *branch,
762                                                          int level,
763                                                          int max_depth,
764                                                          bool show_branch,
765                                                          MemoryContext per_query_ctx,
766                                                          AttInMetadata *attinmeta,
767                                                          Tuplestorestate *tupstore)
768 {
769         TupleDesc       tupdesc = attinmeta->tupdesc;
770         MemoryContext oldcontext;
771         StringInfo      sql = makeStringInfo();
772         int                     ret;
773         int                     proc;
774
775         if (max_depth > 0 && level > max_depth)
776                 return tupstore;
777
778         /* Build initial sql statement */
779         appendStringInfo(sql, "SELECT %s, %s FROM %s WHERE %s = '%s' AND %s IS NOT NULL",
780                                          quote_ident_cstr(key_fld),
781                                          quote_ident_cstr(parent_key_fld),
782                                          quote_ident_cstr(relname),
783                                          quote_ident_cstr(parent_key_fld),
784                                          start_with,
785                                          quote_ident_cstr(key_fld));
786
787         /* Retrieve the desired rows */
788         ret = SPI_exec(sql->data, 0);
789         proc = SPI_processed;
790
791         /* Check for qualifying tuples */
792         if ((ret == SPI_OK_SELECT) && (proc > 0))
793         {
794                 HeapTuple       tuple;
795                 HeapTuple       spi_tuple;
796                 SPITupleTable *tuptable = SPI_tuptable;
797                 TupleDesc       spi_tupdesc = tuptable->tupdesc;
798                 int                     i;
799                 char       *current_key;
800                 char       *current_key_parent;
801                 char            current_level[INT32_STRLEN];
802                 char       *current_branch;
803                 char      **values;
804                 StringInfo      branchstr = NULL;
805
806                 /* start a new branch */
807                 branchstr = makeStringInfo();
808
809                 if (show_branch)
810                         values = (char **) palloc(CONNECTBY_NCOLS * sizeof(char *));
811                 else
812                         values = (char **) palloc(CONNECTBY_NCOLS_NOBRANCH * sizeof(char *));
813
814                 /* First time through, do a little setup */
815                 if (level == 0)
816                 {
817                         /*
818                          * Check that return tupdesc is compatible with the one we got
819                          * from the query, but only at level 0 -- no need to check
820                          * more than once
821                          */
822
823                         if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
824                                 elog(ERROR, "connectby: return and sql tuple descriptions are "
825                                          "incompatible");
826
827                         /* root value is the one we initially start with */
828                         values[0] = start_with;
829
830                         /* root value has no parent */
831                         values[1] = NULL;
832
833                         /* root level is 0 */
834                         sprintf(current_level, "%d", level);
835                         values[2] = current_level;
836
837                         /* root branch is just starting root value */
838                         if (show_branch)
839                                 values[3] = start_with;
840
841                         /* construct the tuple */
842                         tuple = BuildTupleFromCStrings(attinmeta, values);
843
844                         /* switch to long lived context while storing the tuple */
845                         oldcontext = MemoryContextSwitchTo(per_query_ctx);
846
847                         /* now store it */
848                         tuplestore_puttuple(tupstore, tuple);
849
850                         /* now reset the context */
851                         MemoryContextSwitchTo(oldcontext);
852
853                         /* increment level */
854                         level++;
855                 }
856
857                 for (i = 0; i < proc; i++)
858                 {
859                         /* initialize branch for this pass */
860                         appendStringInfo(branchstr, "%s", branch);
861
862                         /* get the next sql result tuple */
863                         spi_tuple = tuptable->vals[i];
864
865                         /* get the current key and parent */
866                         current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
867                         current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
868
869                         /* check to see if this key is also an ancestor */
870                         if (strstr(branchstr->data, current_key))
871                                 elog(ERROR, "infinite recursion detected");
872
873                         /* get the current level */
874                         sprintf(current_level, "%d", level);
875
876                         /* extend the branch */
877                         appendStringInfo(branchstr, "%s%s", branch_delim, current_key);
878                         current_branch = branchstr->data;
879
880                         /* build a tuple */
881                         values[0] = pstrdup(current_key);
882                         values[1] = current_key_parent;
883                         values[2] = current_level;
884                         if (show_branch)
885                                 values[3] = current_branch;
886
887                         tuple = BuildTupleFromCStrings(attinmeta, values);
888
889                         xpfree(current_key);
890                         xpfree(current_key_parent);
891
892                         /* switch to long lived context while storing the tuple */
893                         oldcontext = MemoryContextSwitchTo(per_query_ctx);
894
895                         /* store the tuple for later use */
896                         tuplestore_puttuple(tupstore, tuple);
897
898                         /* now reset the context */
899                         MemoryContextSwitchTo(oldcontext);
900
901                         heap_freetuple(tuple);
902
903                         /* recurse using current_key_parent as the new start_with */
904                         tupstore = build_tuplestore_recursively(key_fld,
905                                                                                                         parent_key_fld,
906                                                                                                         relname,
907                                                                                                         branch_delim,
908                                                                                                         values[0],
909                                                                                                         current_branch,
910                                                                                                         level + 1,
911                                                                                                         max_depth,
912                                                                                                         show_branch,
913                                                                                                         per_query_ctx,
914                                                                                                         attinmeta,
915                                                                                                         tupstore);
916
917                         /* reset branch for next pass */
918                         xpfree(branchstr->data);
919                         initStringInfo(branchstr);
920                 }
921         }
922
923         return tupstore;
924 }
925
926 /*
927  * Check expected (query runtime) tupdesc suitable for Connectby
928  */
929 static void
930 validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch)
931 {
932         /* are there the correct number of columns */
933         if (show_branch)
934         {
935                 if (tupdesc->natts != CONNECTBY_NCOLS)
936                         elog(ERROR, "Query-specified return tuple not valid for Connectby: "
937                                  "wrong number of columns");
938         }
939         else
940         {
941                 if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH)
942                         elog(ERROR, "Query-specified return tuple not valid for Connectby: "
943                                  "wrong number of columns");
944         }
945
946         /* check that the types of the first two columns match */
947         if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
948                 elog(ERROR, "Query-specified return tuple not valid for Connectby: "
949                          "first two columns must be the same type");
950
951         /* check that the type of the third column is INT4 */
952         if (tupdesc->attrs[2]->atttypid != INT4OID)
953                 elog(ERROR, "Query-specified return tuple not valid for Connectby: "
954                          "third column must be type %s", format_type_be(INT4OID));
955
956         /* check that the type of the forth column is TEXT if applicable */
957         if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
958                 elog(ERROR, "Query-specified return tuple not valid for Connectby: "
959                          "third column must be type %s", format_type_be(TEXTOID));
960
961         /* OK, the tupdesc is valid for our purposes */
962 }
963
964 /*
965  * Check if spi sql tupdesc and return tupdesc are compatible
966  */
967 static bool
968 compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
969 {
970         Oid                     ret_atttypid;
971         Oid                     sql_atttypid;
972
973         /* check the key_fld types match */
974         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
975         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
976         if (ret_atttypid != sql_atttypid)
977                 elog(ERROR, "compatConnectbyTupleDescs: SQL key field datatype does "
978                          "not match return key field datatype");
979
980         /* check the parent_key_fld types match */
981         ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
982         sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
983         if (ret_atttypid != sql_atttypid)
984                 elog(ERROR, "compatConnectbyTupleDescs: SQL parent key field datatype "
985                          "does not match return parent key field datatype");
986
987         /* OK, the two tupdescs are compatible for our purposes */
988         return true;
989 }
990
991 /*
992  * Check if two tupdescs match in type of attributes
993  */
994 static bool
995 compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
996 {
997         int                     i;
998         Form_pg_attribute ret_attr;
999         Oid                     ret_atttypid;
1000         Form_pg_attribute sql_attr;
1001         Oid                     sql_atttypid;
1002
1003         /* check the rowid types match */
1004         ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
1005         sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
1006         if (ret_atttypid != sql_atttypid)
1007                 elog(ERROR, "compatCrosstabTupleDescs: SQL rowid datatype does not match"
1008                          " return rowid datatype");
1009
1010         /*
1011          * - attribute [1] of the sql tuple is the category; no need to check
1012          * it - attribute [2] of the sql tuple should match attributes [1] to
1013          * [natts] of the return tuple
1014          */
1015         sql_attr = sql_tupdesc->attrs[2];
1016         for (i = 1; i < ret_tupdesc->natts; i++)
1017         {
1018                 ret_attr = ret_tupdesc->attrs[i];
1019
1020                 if (ret_attr->atttypid != sql_attr->atttypid)
1021                         return false;
1022         }
1023
1024         /* OK, the two tupdescs are compatible for our purposes */
1025         return true;
1026 }
1027
1028 static TupleDesc
1029 make_crosstab_tupledesc(TupleDesc spi_tupdesc, int num_catagories)
1030 {
1031         Form_pg_attribute sql_attr;
1032         Oid                     sql_atttypid;
1033         TupleDesc       tupdesc;
1034         int                     natts;
1035         AttrNumber      attnum;
1036         char            attname[NAMEDATALEN];
1037         int                     i;
1038
1039         /*
1040          * We need to build a tuple description with one column for the
1041          * rowname, and num_catagories columns for the values. Each must be of
1042          * the same type as the corresponding spi result input column.
1043          */
1044         natts = num_catagories + 1;
1045         tupdesc = CreateTemplateTupleDesc(natts, false);
1046
1047         /* first the rowname column */
1048         attnum = 1;
1049
1050         sql_attr = spi_tupdesc->attrs[0];
1051         sql_atttypid = sql_attr->atttypid;
1052
1053         strcpy(attname, "rowname");
1054
1055         TupleDescInitEntry(tupdesc, attnum, attname, sql_atttypid,
1056                                            -1, 0, false);
1057
1058         /* now the catagory values columns */
1059         sql_attr = spi_tupdesc->attrs[2];
1060         sql_atttypid = sql_attr->atttypid;
1061
1062         for (i = 0; i < num_catagories; i++)
1063         {
1064                 attnum++;
1065
1066                 sprintf(attname, "category_%d", i + 1);
1067                 TupleDescInitEntry(tupdesc, attnum, attname, sql_atttypid,
1068                                                    -1, 0, false);
1069         }
1070
1071         return tupdesc;
1072 }
1073
1074 /*
1075  * Return a properly quoted identifier.
1076  * Uses quote_ident in quote.c
1077  */
1078 static char *
1079 quote_ident_cstr(char *rawstr)
1080 {
1081         text       *rawstr_text;
1082         text       *result_text;
1083         char       *result;
1084
1085         rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
1086         result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
1087         result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
1088
1089         return result;
1090 }