]> granicus.if.org Git - postgresql/commitdiff
Implement comparison of generic records (composite types), and invent a
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 13 Oct 2008 16:25:20 +0000 (16:25 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 13 Oct 2008 16:25:20 +0000 (16:25 +0000)
pseudo-type record[] to represent arrays of possibly-anonymous composite
types.  Since composite datums carry their own type identification, no
extra knowledge is needed at the array level.

The main reason for doing this right now is that it is necessary to support
the general case of detection of cycles in recursive queries: if you need to
compare more than one column to detect a cycle, you need to compare a ROW()
to an array built from ROW()s, at least if you want to do it as the spec
suggests.  Add some documentation and regression tests concerning the cycle
detection issue.

18 files changed:
doc/src/sgml/func.sgml
doc/src/sgml/queries.sgml
src/backend/commands/indexcmds.c
src/backend/parser/parse_coerce.c
src/backend/utils/adt/rowtypes.c
src/include/catalog/catversion.h
src/include/catalog/pg_amop.h
src/include/catalog/pg_amproc.h
src/include/catalog/pg_opclass.h
src/include/catalog/pg_operator.h
src/include/catalog/pg_opfamily.h
src/include/catalog/pg_proc.h
src/include/catalog/pg_type.h
src/include/utils/builtins.h
src/test/regress/expected/rowtypes.out
src/test/regress/expected/with.out
src/test/regress/sql/rowtypes.sql
src/test/regress/sql/with.sql

index 31ef6dc864d5abf5f81b903b4a97311e6931fa43..e289326f2fb4b4f3e7395785f72534c1e2b0fef9 100644 (file)
@@ -1,4 +1,4 @@
-<!-- $PostgreSQL: pgsql/doc/src/sgml/func.sgml,v 1.448 2008/10/03 07:33:08 heikki Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/func.sgml,v 1.449 2008/10/13 16:25:19 tgl Exp $ -->
 
  <chapter id="functions">
   <title>Functions and Operators</title>
@@ -10667,6 +10667,20 @@ AND
    be either true or false, never null.
   </para>
 
+  <note>
+   <para>
+    The SQL specification requires row-wise comparison to return NULL if the
+    result depends on comparing two NULL values or a NULL and a non-NULL.
+    <productname>PostgreSQL</productname> does this only when comparing the
+    results of two row constructors or comparing a row constructor to the
+    output of a subquery (as in <xref linkend="functions-subquery">).
+    In other contexts where two composite-type values are compared, two
+    NULL field values are considered equal, and a NULL is considered larger
+    than a non-NULL.  This is necessary in order to have consistent sorting
+    and indexing behavior for composite types.
+   </para>
+  </note>
+
   </sect2>
  </sect1>
 
index c2e3807920fd0775b62e24cb0354869a68719a19..0232d80db71a8cdfed632d441dc2a727e0119bfe 100644 (file)
@@ -1,4 +1,4 @@
-<!-- $PostgreSQL: pgsql/doc/src/sgml/queries.sgml,v 1.47 2008/10/07 19:27:03 tgl Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/queries.sgml,v 1.48 2008/10/13 16:25:19 tgl Exp $ -->
 
 <chapter id="queries">
  <title>Queries</title>
@@ -1604,8 +1604,85 @@ GROUP BY sub_part
    the recursive part of the query will eventually return no tuples,
    or else the query will loop indefinitely.  Sometimes, using
    <literal>UNION</> instead of <literal>UNION ALL</> can accomplish this
-   by discarding rows that duplicate previous output rows; this catches
-   cycles that would otherwise repeat.  A useful trick for testing queries
+   by discarding rows that duplicate previous output rows.  However, often a
+   cycle does not involve output rows that are completely duplicate: it may be
+   necessary to check just one or a few fields to see if the same point has
+   been reached before.  The standard method for handling such situations is
+   to compute an array of the already-visited values.  For example, consider
+   the following query that searches a table <structname>graph</> using a
+   <structfield>link</> field:
+
+<programlisting>
+WITH RECURSIVE search_graph(id, link, data, depth) AS (
+        SELECT g.id, g.link, g.data, 1
+        FROM graph g
+      UNION ALL
+        SELECT g.id, g.link, g.data, sg.depth + 1
+        FROM graph g, search_graph sg
+        WHERE g.id = sg.link
+)
+SELECT * FROM search_graph;
+</programlisting>
+
+   This query will loop if the <structfield>link</> relationships contain
+   cycles.  Because we require a <quote>depth</> output, just changing
+   <literal>UNION ALL</> to <literal>UNION</> would not eliminate the looping.
+   Instead we need to recognize whether we have reached the same row again
+   while following a particular path of links.  We add two columns
+   <structfield>path</> and <structfield>cycle</> to the loop-prone query:
+
+<programlisting>
+WITH RECURSIVE search_graph(id, link, data, depth, path, cycle) AS (
+        SELECT g.id, g.link, g.data, 1,
+          ARRAY[g.id],
+          false
+        FROM graph g
+      UNION ALL
+        SELECT g.id, g.link, g.data, sg.depth + 1,
+          path || ARRAY[g.id],
+          g.id = ANY(path)
+        FROM graph g, search_graph sg
+        WHERE g.id = sg.link AND NOT cycle
+)
+SELECT * FROM search_graph;
+</programlisting>
+
+   Aside from preventing cycles, the array value is often useful in its own
+   right as representing the <quote>path</> taken to reach any particular row.
+  </para>
+
+  <para>
+   In the general case where more than one field needs to be checked to
+   recognize a cycle, use an array of rows.  For example, if we needed to
+   compare fields <structfield>f1</> and <structfield>f2</>:
+
+<programlisting>
+WITH RECURSIVE search_graph(id, link, data, depth, path, cycle) AS (
+        SELECT g.id, g.link, g.data, 1,
+          ARRAY[ROW(g.f1, g.f2)],
+          false
+        FROM graph g
+      UNION ALL
+        SELECT g.id, g.link, g.data, sg.depth + 1,
+          path || ARRAY[ROW(g.f1, g.f2)],
+          ROW(g.f1, g.f2) = ANY(path)
+        FROM graph g, search_graph sg
+        WHERE g.id = sg.link AND NOT cycle
+)
+SELECT * FROM search_graph;
+</programlisting>
+  </para>
+
+  <tip>
+   <para>
+    Omit the <literal>ROW()</> syntax in the common case where only one field
+    needs to be checked to recognize a cycle.  This allows a simple array
+    rather than a composite-type array to be used, gaining efficiency.
+   </para>
+  </tip>
+
+  <para>
+   A helpful trick for testing queries
    when you are not certain if they might loop is to place a <literal>LIMIT</>
    in the parent query.  For example, this query would loop forever without
    the <literal>LIMIT</>:
index cbf440fc438727ba08206d657c9de4d89cebffe8..c4e548becb7317a1a8d58a568ec89d79abd8f177 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.179 2008/08/25 22:42:32 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.180 2008/10/13 16:25:19 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -795,7 +795,8 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
                        atttype = attform->atttypid;
                        ReleaseSysCache(atttuple);
                }
-               else if (attribute->expr && IsA(attribute->expr, Var))
+               else if (attribute->expr && IsA(attribute->expr, Var) &&
+                                ((Var *) attribute->expr)->varattno != InvalidAttrNumber)
                {
                        /* Tricky tricky, he wrote (column) ... treat as simple attr */
                        Var                *var = (Var *) attribute->expr;
index 69efaddfecf4dce8797c85459794f1b81b8fe49c..7c66c6380476d4597f8f43147fc5704c46b11e63 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/parser/parse_coerce.c,v 2.168 2008/10/06 17:39:26 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/parser/parse_coerce.c,v 2.169 2008/10/13 16:25:19 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,6 +46,7 @@ static Node *coerce_record_to_complex(ParseState *pstate, Node *node,
                                                 CoercionContext ccontext,
                                                 CoercionForm cformat,
                                                 int location);
+static bool is_complex_array(Oid typid);
 
 
 /*
@@ -402,6 +403,21 @@ coerce_type(ParseState *pstate, Node *node,
                /* NB: we do NOT want a RelabelType here */
                return node;
        }
+#ifdef NOT_USED
+       if (inputTypeId == RECORDARRAYOID &&
+               is_complex_array(targetTypeId))
+       {
+               /* Coerce record[] to a specific complex array type */
+               /* not implemented yet ... */
+       }
+#endif
+       if (targetTypeId == RECORDARRAYOID &&
+               is_complex_array(inputTypeId))
+       {
+               /* Coerce a specific complex array type to record[] */
+               /* NB: we do NOT want a RelabelType here */
+               return node;
+       }
        if (typeInheritsFrom(inputTypeId, targetTypeId))
        {
                /*
@@ -492,6 +508,23 @@ can_coerce_type(int nargs, Oid *input_typeids, Oid *target_typeids,
                        ISCOMPLEX(inputTypeId))
                        continue;
 
+#ifdef NOT_USED                                        /* not implemented yet */
+               /*
+                * If input is record[] and target is a composite array type,
+                * assume we can coerce (may need tighter checking here)
+                */
+               if (inputTypeId == RECORDARRAYOID &&
+                       is_complex_array(targetTypeId))
+                       continue;
+#endif
+
+               /*
+                * If input is a composite array type and target is record[], accept
+                */
+               if (targetTypeId == RECORDARRAYOID &&
+                       is_complex_array(inputTypeId))
+                       continue;
+
                /*
                 * If input is a class type that inherits from target, accept
                 */
@@ -1724,8 +1757,8 @@ IsPreferredType(TYPCATEGORY category, Oid type)
  * invokable, no-function-needed pg_cast entry.  Also, a domain is always
  * binary-coercible to its base type, though *not* vice versa (in the other
  * direction, one must apply domain constraint checks before accepting the
- * value as legitimate).  We also need to special-case the polymorphic
- * ANYARRAY type.
+ * value as legitimate).  We also need to special-case various polymorphic
+ * types.
  *
  * This function replaces IsBinaryCompatible(), which was an inherently
  * symmetric test.     Since the pg_cast entries aren't necessarily symmetric,
@@ -1765,6 +1798,16 @@ IsBinaryCoercible(Oid srctype, Oid targettype)
                if (type_is_enum(srctype))
                        return true;
 
+       /* Also accept any composite type as coercible to RECORD */
+       if (targettype == RECORDOID)
+               if (ISCOMPLEX(srctype))
+                       return true;
+
+       /* Also accept any composite array type as coercible to RECORD[] */
+       if (targettype == RECORDARRAYOID)
+               if (is_complex_array(srctype))
+                       return true;
+
        /* Else look in pg_cast */
        tuple = SearchSysCache(CASTSOURCETARGET,
                                                   ObjectIdGetDatum(srctype),
@@ -2002,3 +2045,18 @@ find_typmod_coercion_function(Oid typeId,
 
        return result;
 }
+
+/*
+ * is_complex_array
+ *             Is this type an array of composite?
+ *
+ * Note: this will not return true for record[]; check for RECORDARRAYOID
+ * separately if needed.
+ */
+static bool
+is_complex_array(Oid typid)
+{
+       Oid                     elemtype = get_element_type(typid);
+
+       return (OidIsValid(elemtype) && ISCOMPLEX(elemtype));
+}
index b7981660ef248150ddca912449f1021b38069cd8..f08244216e87b566ed0aa2e0924c0cd138bba32c 100644 (file)
@@ -1,14 +1,14 @@
 /*-------------------------------------------------------------------------
  *
  * rowtypes.c
- *       I/O functions for generic composite types.
+ *       I/O and comparison functions for generic composite types.
  *
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/adt/rowtypes.c,v 1.21 2008/05/12 00:00:51 alvherre Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/adt/rowtypes.c,v 1.22 2008/10/13 16:25:19 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -42,6 +42,24 @@ typedef struct RecordIOData
        ColumnIOData columns[1];        /* VARIABLE LENGTH ARRAY */
 } RecordIOData;
 
+/*
+ * structure to cache metadata needed for record comparison
+ */
+typedef struct ColumnCompareData
+{
+       TypeCacheEntry *typentry;       /* has everything we need, actually */
+} ColumnCompareData;
+
+typedef struct RecordCompareData
+{
+       int                     ncolumns;               /* allocated length of columns[] */
+       Oid                     record1_type;
+       int32           record1_typmod;
+       Oid                     record2_type;
+       int32           record2_typmod;
+       ColumnCompareData columns[1];   /* VARIABLE LENGTH ARRAY */
+} RecordCompareData;
+
 
 /*
  * record_in           - input routine for any composite type.
@@ -734,3 +752,479 @@ record_send(PG_FUNCTION_ARGS)
 
        PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
 }
+
+
+/*
+ * record_cmp()
+ * Internal comparison function for records.
+ *
+ * Returns -1, 0 or 1
+ *
+ * Do not assume that the two inputs are exactly the same record type;
+ * for instance we might be comparing an anonymous ROW() construct against a
+ * named composite type.  We will compare as long as they have the same number
+ * of non-dropped columns of the same types.
+ */
+static int
+record_cmp(FunctionCallInfo fcinfo)
+{
+       HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
+       HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
+       int                     result = 0;
+       Oid                     tupType1;
+       Oid                     tupType2;
+       int32           tupTypmod1;
+       int32           tupTypmod2;
+       TupleDesc       tupdesc1;
+       TupleDesc       tupdesc2;
+       HeapTupleData tuple1;
+       HeapTupleData tuple2;
+       int                     ncolumns1;
+       int                     ncolumns2;
+       RecordCompareData *my_extra;
+       int                     ncols;
+       Datum      *values1;
+       Datum      *values2;
+       bool       *nulls1;
+       bool       *nulls2;
+       int                     i1;
+       int                     i2;
+       int                     j;
+
+       /* Extract type info from the tuples */
+       tupType1 = HeapTupleHeaderGetTypeId(record1);
+       tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
+       tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
+       ncolumns1 = tupdesc1->natts;
+       tupType2 = HeapTupleHeaderGetTypeId(record2);
+       tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
+       tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
+       ncolumns2 = tupdesc2->natts;
+
+       /* Build temporary HeapTuple control structures */
+       tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
+       ItemPointerSetInvalid(&(tuple1.t_self));
+       tuple1.t_tableOid = InvalidOid;
+       tuple1.t_data = record1;
+       tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
+       ItemPointerSetInvalid(&(tuple2.t_self));
+       tuple2.t_tableOid = InvalidOid;
+       tuple2.t_data = record2;
+
+       /*
+        * We arrange to look up the needed comparison info just once per series
+        * of calls, assuming the record types don't change underneath us.
+        */
+       ncols = Max(ncolumns1, ncolumns2);
+       my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
+       if (my_extra == NULL ||
+               my_extra->ncolumns < ncols)
+       {
+               fcinfo->flinfo->fn_extra =
+                       MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+                                                          sizeof(RecordCompareData) - sizeof(ColumnCompareData)
+                                                          + ncols * sizeof(ColumnCompareData));
+               my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
+               my_extra->ncolumns = ncols;
+               my_extra->record1_type = InvalidOid;
+               my_extra->record1_typmod = 0;
+               my_extra->record2_type = InvalidOid;
+               my_extra->record2_typmod = 0;
+       }
+
+       if (my_extra->record1_type != tupType1 ||
+               my_extra->record1_typmod != tupTypmod1 ||
+               my_extra->record2_type != tupType2 ||
+               my_extra->record2_typmod != tupTypmod2)
+       {
+               MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
+               my_extra->record1_type = tupType1;
+               my_extra->record1_typmod = tupTypmod1;
+               my_extra->record2_type = tupType2;
+               my_extra->record2_typmod = tupTypmod2;
+       }
+
+       /* Break down the tuples into fields */
+       values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
+       nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
+       heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
+       values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
+       nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
+       heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
+
+       /*
+        * Scan corresponding columns, allowing for dropped columns in different
+        * places in the two rows.  i1 and i2 are physical column indexes,
+        * j is the logical column index.
+        */
+       i1 = i2 = j = 0;
+       while (i1 < ncolumns1 || i2 < ncolumns2)
+       {
+               TypeCacheEntry *typentry;
+               FunctionCallInfoData locfcinfo;
+               int32           cmpresult;
+
+               /*
+                * Skip dropped columns
+                */
+               if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped)
+               {
+                       i1++;
+                       continue;
+               }
+               if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped)
+               {
+                       i2++;
+                       continue;
+               }
+               if (i1 >= ncolumns1 || i2 >= ncolumns2)
+                       break;                          /* we'll deal with mismatch below loop */
+
+               /*
+                * Have two matching columns, they must be same type
+                */
+               if (tupdesc1->attrs[i1]->atttypid !=
+                       tupdesc2->attrs[i2]->atttypid)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("cannot compare dissimilar column types %s and %s at record column %d",
+                                                       format_type_be(tupdesc1->attrs[i1]->atttypid),
+                                                       format_type_be(tupdesc2->attrs[i2]->atttypid),
+                                                       j+1)));
+
+               /*
+                * Lookup the comparison function if not done already
+                */
+               typentry = my_extra->columns[j].typentry;
+               if (typentry == NULL ||
+                       typentry->type_id != tupdesc1->attrs[i1]->atttypid)
+               {
+                       typentry = lookup_type_cache(tupdesc1->attrs[i1]->atttypid,
+                                                                                TYPECACHE_CMP_PROC_FINFO);
+                       if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_FUNCTION),
+                                                errmsg("could not identify a comparison function for type %s",
+                                                               format_type_be(typentry->type_id))));
+                       my_extra->columns[j].typentry = typentry;
+               }
+
+               /*
+                * We consider two NULLs equal; NULL > not-NULL.
+                */
+               if (!nulls1[i1] || !nulls2[i2])
+               {
+                       if (nulls1[i1])
+                       {
+                               /* arg1 is greater than arg2 */
+                               result = 1;
+                               break;
+                       }
+                       if (nulls2[i2])
+                       {
+                               /* arg1 is less than arg2 */
+                               result = -1;
+                               break;
+                       }
+
+                       /* Compare the pair of elements */
+                       InitFunctionCallInfoData(locfcinfo, &typentry->cmp_proc_finfo, 2,
+                                                                        NULL, NULL);
+                       locfcinfo.arg[0] = values1[i1];
+                       locfcinfo.arg[1] = values2[i2];
+                       locfcinfo.argnull[0] = false;
+                       locfcinfo.argnull[1] = false;
+                       locfcinfo.isnull = false;
+                       cmpresult = DatumGetInt32(FunctionCallInvoke(&locfcinfo));
+
+                       if (cmpresult < 0)
+                       {
+                               /* arg1 is less than arg2 */
+                               result = -1;
+                               break;
+                       }
+                       else if (cmpresult > 0)
+                       {
+                               /* arg1 is greater than arg2 */
+                               result = 1;
+                               break;
+                       }
+               }
+
+               /* equal, so continue to next column */
+               i1++, i2++, j++;
+       }
+
+       /*
+        * If we didn't break out of the loop early, check for column count
+        * mismatch.  (We do not report such mismatch if we found unequal
+        * column values; is that a feature or a bug?)
+        */
+       if (result == 0)
+       {
+               if (i1 != ncolumns1 || i2 != ncolumns2)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("cannot compare record types with different numbers of columns")));
+       }
+
+       pfree(values1);
+       pfree(nulls1);
+       pfree(values2);
+       pfree(nulls2);
+       ReleaseTupleDesc(tupdesc1);
+       ReleaseTupleDesc(tupdesc2);
+
+       /* Avoid leaking memory when handed toasted input. */
+       PG_FREE_IF_COPY(record1, 0);
+       PG_FREE_IF_COPY(record2, 1);
+
+       return result;
+}
+
+/*
+ * record_eq :
+ *               compares two records for equality
+ * result :
+ *               returns true if the records are equal, false otherwise.
+ *
+ * Note: we do not use record_cmp here, since equality may be meaningful in
+ * datatypes that don't have a total ordering (and hence no btree support).
+ */
+Datum
+record_eq(PG_FUNCTION_ARGS)
+{
+       HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
+       HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
+       bool            result = true;
+       Oid                     tupType1;
+       Oid                     tupType2;
+       int32           tupTypmod1;
+       int32           tupTypmod2;
+       TupleDesc       tupdesc1;
+       TupleDesc       tupdesc2;
+       HeapTupleData tuple1;
+       HeapTupleData tuple2;
+       int                     ncolumns1;
+       int                     ncolumns2;
+       RecordCompareData *my_extra;
+       int                     ncols;
+       Datum      *values1;
+       Datum      *values2;
+       bool       *nulls1;
+       bool       *nulls2;
+       int                     i1;
+       int                     i2;
+       int                     j;
+
+       /* Extract type info from the tuples */
+       tupType1 = HeapTupleHeaderGetTypeId(record1);
+       tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
+       tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
+       ncolumns1 = tupdesc1->natts;
+       tupType2 = HeapTupleHeaderGetTypeId(record2);
+       tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
+       tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
+       ncolumns2 = tupdesc2->natts;
+
+       /* Build temporary HeapTuple control structures */
+       tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
+       ItemPointerSetInvalid(&(tuple1.t_self));
+       tuple1.t_tableOid = InvalidOid;
+       tuple1.t_data = record1;
+       tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
+       ItemPointerSetInvalid(&(tuple2.t_self));
+       tuple2.t_tableOid = InvalidOid;
+       tuple2.t_data = record2;
+
+       /*
+        * We arrange to look up the needed comparison info just once per series
+        * of calls, assuming the record types don't change underneath us.
+        */
+       ncols = Max(ncolumns1, ncolumns2);
+       my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
+       if (my_extra == NULL ||
+               my_extra->ncolumns < ncols)
+       {
+               fcinfo->flinfo->fn_extra =
+                       MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+                                                          sizeof(RecordCompareData) - sizeof(ColumnCompareData)
+                                                          + ncols * sizeof(ColumnCompareData));
+               my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
+               my_extra->ncolumns = ncols;
+               my_extra->record1_type = InvalidOid;
+               my_extra->record1_typmod = 0;
+               my_extra->record2_type = InvalidOid;
+               my_extra->record2_typmod = 0;
+       }
+
+       if (my_extra->record1_type != tupType1 ||
+               my_extra->record1_typmod != tupTypmod1 ||
+               my_extra->record2_type != tupType2 ||
+               my_extra->record2_typmod != tupTypmod2)
+       {
+               MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
+               my_extra->record1_type = tupType1;
+               my_extra->record1_typmod = tupTypmod1;
+               my_extra->record2_type = tupType2;
+               my_extra->record2_typmod = tupTypmod2;
+       }
+
+       /* Break down the tuples into fields */
+       values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
+       nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
+       heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
+       values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
+       nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
+       heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
+
+       /*
+        * Scan corresponding columns, allowing for dropped columns in different
+        * places in the two rows.  i1 and i2 are physical column indexes,
+        * j is the logical column index.
+        */
+       i1 = i2 = j = 0;
+       while (i1 < ncolumns1 || i2 < ncolumns2)
+       {
+               TypeCacheEntry *typentry;
+               FunctionCallInfoData locfcinfo;
+               bool            oprresult;
+
+               /*
+                * Skip dropped columns
+                */
+               if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped)
+               {
+                       i1++;
+                       continue;
+               }
+               if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped)
+               {
+                       i2++;
+                       continue;
+               }
+               if (i1 >= ncolumns1 || i2 >= ncolumns2)
+                       break;                          /* we'll deal with mismatch below loop */
+
+               /*
+                * Have two matching columns, they must be same type
+                */
+               if (tupdesc1->attrs[i1]->atttypid !=
+                       tupdesc2->attrs[i2]->atttypid)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("cannot compare dissimilar column types %s and %s at record column %d",
+                                                       format_type_be(tupdesc1->attrs[i1]->atttypid),
+                                                       format_type_be(tupdesc2->attrs[i2]->atttypid),
+                                                       j+1)));
+
+               /*
+                * Lookup the equality function if not done already
+                */
+               typentry = my_extra->columns[j].typentry;
+               if (typentry == NULL ||
+                       typentry->type_id != tupdesc1->attrs[i1]->atttypid)
+               {
+                       typentry = lookup_type_cache(tupdesc1->attrs[i1]->atttypid,
+                                                                                TYPECACHE_EQ_OPR_FINFO);
+                       if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_FUNCTION),
+                                                errmsg("could not identify an equality operator for type %s",
+                                                               format_type_be(typentry->type_id))));
+                       my_extra->columns[j].typentry = typentry;
+               }
+
+               /*
+                * We consider two NULLs equal; NULL > not-NULL.
+                */
+               if (!nulls1[i1] || !nulls2[i2])
+               {
+                       if (nulls1[i1] || nulls2[i2])
+                       {
+                               result = false;
+                               break;
+                       }
+
+                       /* Compare the pair of elements */
+                       InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
+                                                                        NULL, NULL);
+                       locfcinfo.arg[0] = values1[i1];
+                       locfcinfo.arg[1] = values2[i2];
+                       locfcinfo.argnull[0] = false;
+                       locfcinfo.argnull[1] = false;
+                       locfcinfo.isnull = false;
+                       oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo));
+                       if (!oprresult)
+                       {
+                               result = false;
+                               break;
+                       }
+               }
+
+               /* equal, so continue to next column */
+               i1++, i2++, j++;
+       }
+
+       /*
+        * If we didn't break out of the loop early, check for column count
+        * mismatch.  (We do not report such mismatch if we found unequal
+        * column values; is that a feature or a bug?)
+        */
+       if (result)
+       {
+               if (i1 != ncolumns1 || i2 != ncolumns2)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("cannot compare record types with different numbers of columns")));
+       }
+
+       pfree(values1);
+       pfree(nulls1);
+       pfree(values2);
+       pfree(nulls2);
+       ReleaseTupleDesc(tupdesc1);
+       ReleaseTupleDesc(tupdesc2);
+
+       /* Avoid leaking memory when handed toasted input. */
+       PG_FREE_IF_COPY(record1, 0);
+       PG_FREE_IF_COPY(record2, 1);
+
+       PG_RETURN_BOOL(result);
+}
+
+Datum
+record_ne(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(!DatumGetBool(record_eq(fcinfo)));
+}
+
+Datum
+record_lt(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(record_cmp(fcinfo) < 0);
+}
+
+Datum
+record_gt(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(record_cmp(fcinfo) > 0);
+}
+
+Datum
+record_le(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(record_cmp(fcinfo) <= 0);
+}
+
+Datum
+record_ge(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(record_cmp(fcinfo) >= 0);
+}
+
+Datum
+btrecordcmp(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_INT32(record_cmp(fcinfo));
+}
index 3f27787992224b9acfbc80b42108f30f6b29738d..0ab03e0578e12f424e37ba737f33f4a11c724be4 100644 (file)
@@ -37,7 +37,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.496 2008/10/06 17:39:26 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.497 2008/10/13 16:25:19 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     200810063
+#define CATALOG_VERSION_NO     200810131
 
 #endif
index 8368d4bbc798ba9e1bcda7c8cbb77ff72c08c016..f768da3f617d62904bae165a9d3e2a790a1063e7 100644 (file)
@@ -29,7 +29,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_amop.h,v 1.87 2008/05/27 00:13:09 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_amop.h,v 1.88 2008/10/13 16:25:19 tgl Exp $
  *
  * NOTES
  *      the genbki.sh script reads this file and generates .bki
@@ -470,6 +470,16 @@ DATA(insert (      397   2277 2277 3 1070  403 ));
 DATA(insert (  397   2277 2277 4 1075  403 ));
 DATA(insert (  397   2277 2277 5 1073  403 ));
 
+/*
+ *     btree record_ops
+ */
+
+DATA(insert (  2994  2249 2249 1 2990  403 ));
+DATA(insert (  2994  2249 2249 2 2992  403 ));
+DATA(insert (  2994  2249 2249 3 2988  403 ));
+DATA(insert (  2994  2249 2249 4 2993  403 ));
+DATA(insert (  2994  2249 2249 5 2991  403 ));
+
 /*
  * btree uuid_ops
  */
index 5ca46600e3e8b89c8996475fec5a313f108da9db..c0f91a64b60368d8fde3b95d73cafed204692473 100644 (file)
@@ -22,7 +22,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_amproc.h,v 1.73 2008/05/27 00:13:09 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_amproc.h,v 1.74 2008/10/13 16:25:19 tgl Exp $
  *
  * NOTES
  *       the genbki.sh script reads this file and generates .bki
@@ -111,6 +111,7 @@ DATA(insert (       1986   19 19 1 359 ));
 DATA(insert (  1988   1700 1700 1 1769 ));
 DATA(insert (  1989   26 26 1 356 ));
 DATA(insert (  1991   30 30 1 404 ));
+DATA(insert (  2994   2249 2249 1 2987 ));
 DATA(insert (  1994   25 25 1 360 ));
 DATA(insert (  1996   1083 1083 1 1107 ));
 DATA(insert (  2000   1266 1266 1 1358 ));
index 7c4d95003c8b2ea60380c36131109c4f5396ea15..6251bc9829919020254e8ab4a12cde90fcd6d928 100644 (file)
@@ -28,7 +28,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_opclass.h,v 1.83 2008/09/15 18:43:41 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_opclass.h,v 1.84 2008/10/13 16:25:20 tgl Exp $
  *
  * NOTES
  *       the genbki.sh script reads this file and generates .bki
@@ -137,6 +137,7 @@ DATA(insert OID = 1981 ( 403        oid_ops         PGNSP PGUID 1989   26 t 0 ));
 DATA(insert (  405             oid_ops                         PGNSP PGUID 1990   26 t 0 ));
 DATA(insert (  403             oidvector_ops           PGNSP PGUID 1991   30 t 0 ));
 DATA(insert (  405             oidvector_ops           PGNSP PGUID 1992   30 t 0 ));
+DATA(insert (  403             record_ops                      PGNSP PGUID 2994 2249 t 0 ));
 DATA(insert (  403             text_ops                        PGNSP PGUID 1994   25 t 0 ));
 DATA(insert (  405             text_ops                        PGNSP PGUID 1995   25 t 0 ));
 DATA(insert (  403             time_ops                        PGNSP PGUID 1996 1083 t 0 ));
index e19bb0f9271292e4a6dcc78bb65ba47acefb606e..d112511932843c51b3d401f0623493f2614bed1c 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_operator.h,v 1.163 2008/09/19 19:03:40 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_operator.h,v 1.164 2008/10/13 16:25:20 tgl Exp $
  *
  * NOTES
  *       the genbki.sh script reads this file and generates .bki
@@ -933,6 +933,14 @@ DATA(insert OID = 3694 (  "<@"        PGNSP PGUID b f f 3615        3615    16 3693        0        tsq_m
 DATA(insert OID = 3762 (  "@@"    PGNSP PGUID b f f 25          25              16    0        0        ts_match_tt    contsel    contjoinsel   ));
 DATA(insert OID = 3763 (  "@@"    PGNSP PGUID b f f 25          3615    16    0        0        ts_match_tq    contsel    contjoinsel   ));
 
+/* generic record comparison operators */
+DATA(insert OID = 2988 (  "="     PGNSP PGUID b t f 2249 2249 16 2988 2989 record_eq eqsel eqjoinsel ));
+DATA(insert OID = 2989 (  "<>"    PGNSP PGUID b f f 2249 2249 16 2989 2988 record_ne neqsel neqjoinsel ));
+DATA(insert OID = 2990 (  "<"     PGNSP PGUID b f f 2249 2249 16 2991 2993 record_lt scalarltsel scalarltjoinsel ));
+DATA(insert OID = 2991 (  ">"     PGNSP PGUID b f f 2249 2249 16 2990 2992 record_gt scalargtsel scalargtjoinsel ));
+DATA(insert OID = 2992 (  "<="    PGNSP PGUID b f f 2249 2249 16 2993 2991 record_le scalarltsel scalarltjoinsel ));
+DATA(insert OID = 2993 (  ">="    PGNSP PGUID b f f 2249 2249 16 2992 2990 record_ge scalargtsel scalargtjoinsel ));
+
 
 /*
  * function prototypes
index e6066f06566f12e3eb976974e94cb1651bd72f6a..aa6e7809e7dd1dbf3044a81579670b800bcb2549 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_opfamily.h,v 1.9 2008/05/27 00:13:09 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_opfamily.h,v 1.10 2008/10/13 16:25:20 tgl Exp $
  *
  * NOTES
  *       the genbki.sh script reads this file and generates .bki
@@ -94,6 +94,7 @@ DATA(insert OID = 1989 (      403             oid_ops                 PGNSP PGUID ));
 DATA(insert OID = 1990 (       405             oid_ops                 PGNSP PGUID ));
 DATA(insert OID = 1991 (       403             oidvector_ops   PGNSP PGUID ));
 DATA(insert OID = 1992 (       405             oidvector_ops   PGNSP PGUID ));
+DATA(insert OID = 2994 (       403             record_ops              PGNSP PGUID ));
 DATA(insert OID = 1994 (       403             text_ops                PGNSP PGUID ));
 #define TEXT_BTREE_FAM_OID 1994
 DATA(insert OID = 1995 (       405             text_ops                PGNSP PGUID ));
index 31449b60e0f7ac6eb038eb3e6f1b3c7f28100ca9..70dc0a319a315e2017ba771e79976cbfe1d8bb50 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.518 2008/10/06 13:05:37 mha Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.519 2008/10/13 16:25:20 tgl Exp $
  *
  * NOTES
  *       The script catalog/genbki.sh reads this file and generates .bki
@@ -4560,6 +4560,22 @@ DESCR("get set of in-progress txids in snapshot");
 DATA(insert OID = 2948 (  txid_visible_in_snapshot     PGNSP PGUID 12 1  0 0 f f t f i 2 16 "20 2970" _null_ _null_ _null_ txid_visible_in_snapshot _null_ _null_ _null_ ));
 DESCR("is txid visible in snapshot?");
 
+/* record comparison */
+DATA(insert OID = 2981 (  record_eq               PGNSP PGUID 12 1 0 0 f f t f i 2 16 "2249 2249" _null_ _null_ _null_ record_eq _null_ _null_ _null_ ));
+DESCR("record equal");
+DATA(insert OID = 2982 (  record_ne               PGNSP PGUID 12 1 0 0 f f t f i 2 16 "2249 2249" _null_ _null_ _null_ record_ne _null_ _null_ _null_ ));
+DESCR("record not equal");
+DATA(insert OID = 2983 (  record_lt               PGNSP PGUID 12 1 0 0 f f t f i 2 16 "2249 2249" _null_ _null_ _null_ record_lt _null_ _null_ _null_ ));
+DESCR("record less than");
+DATA(insert OID = 2984 (  record_gt               PGNSP PGUID 12 1 0 0 f f t f i 2 16 "2249 2249" _null_ _null_ _null_ record_gt _null_ _null_ _null_ ));
+DESCR("record greater than");
+DATA(insert OID = 2985 (  record_le               PGNSP PGUID 12 1 0 0 f f t f i 2 16 "2249 2249" _null_ _null_ _null_ record_le _null_ _null_ _null_ ));
+DESCR("record less than or equal");
+DATA(insert OID = 2986 (  record_ge               PGNSP PGUID 12 1 0 0 f f t f i 2 16 "2249 2249" _null_ _null_ _null_ record_ge _null_ _null_ _null_ ));
+DESCR("record greater than or equal");
+DATA(insert OID = 2987 (  btrecordcmp     PGNSP PGUID 12 1 0 0 f f t f i 2 23 "2249 2249" _null_ _null_ _null_ btrecordcmp _null_ _null_ _null_ ));
+DESCR("btree less-equal-greater");
+
 
 /*
  * Symbolic values for provolatile column: these indicate whether the result
index d4f4fc0968baafdeb9c829a72fa9207523d12252..f3367b41b596d6f3a7683240efb0ef4e5400181a 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/catalog/pg_type.h,v 1.200 2008/09/25 03:28:56 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/catalog/pg_type.h,v 1.201 2008/10/13 16:25:20 tgl Exp $
  *
  * NOTES
  *       the genbki.sh script reads this file and generates .bki
@@ -597,8 +597,10 @@ DATA(insert OID = 2949 ( _txid_snapshot PGNSP PGUID -1 f b A f t \054 0 2970 0 a
  * but there is now support for it in records and arrays.  Perhaps we should
  * just treat it as a regular base type?
  */
-DATA(insert OID = 2249 ( record                        PGNSP PGUID -1 f p P f t \054 0 0 0 record_in record_out record_recv record_send - - - d x f 0 -1 0 _null_ _null_ ));
+DATA(insert OID = 2249 ( record                        PGNSP PGUID -1 f p P f t \054 0 0 2287 record_in record_out record_recv record_send - - - d x f 0 -1 0 _null_ _null_ ));
 #define RECORDOID              2249
+DATA(insert OID = 2287 ( _record               PGNSP PGUID -1 f p P f t \054 0 2249 0 array_in array_out array_recv array_send - - - d x f 0 -1 0 _null_ _null_ ));
+#define RECORDARRAYOID 2287
 DATA(insert OID = 2275 ( cstring               PGNSP PGUID -2 f p P f t \054 0 0 1263 cstring_in cstring_out cstring_recv cstring_send - - - c p f 0 -1 0 _null_ _null_ ));
 #define CSTRINGOID             2275
 DATA(insert OID = 2276 ( any                   PGNSP PGUID  4 t p P f t \054 0 0 0 any_in any_out - - - - - i p f 0 -1 0 _null_ _null_ ));
index a98f93165f9043c0e53b16fb00ff94b85430ba47..b9efbe88b33ce1827a1462c5999b23685e4106ad 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/builtins.h,v 1.323 2008/10/06 20:29:38 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/utils/builtins.h,v 1.324 2008/10/13 16:25:20 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -524,6 +524,13 @@ extern Datum record_in(PG_FUNCTION_ARGS);
 extern Datum record_out(PG_FUNCTION_ARGS);
 extern Datum record_recv(PG_FUNCTION_ARGS);
 extern Datum record_send(PG_FUNCTION_ARGS);
+extern Datum record_eq(PG_FUNCTION_ARGS);
+extern Datum record_ne(PG_FUNCTION_ARGS);
+extern Datum record_lt(PG_FUNCTION_ARGS);
+extern Datum record_gt(PG_FUNCTION_ARGS);
+extern Datum record_le(PG_FUNCTION_ARGS);
+extern Datum record_ge(PG_FUNCTION_ARGS);
+extern Datum btrecordcmp(PG_FUNCTION_ARGS);
 
 /* ruleutils.c */
 extern Datum pg_get_ruledef(PG_FUNCTION_ARGS);
index 73ae3ce10ef801d359bf31094ff208833d0a990c..e1181a61b8327475e0602a56fc31e8c81e8d32ed 100644 (file)
@@ -269,3 +269,23 @@ select ROW() = ROW();
 ERROR:  cannot compare rows of zero length
 LINE 1: select ROW() = ROW();
                      ^
+-- Check ability to create arrays of anonymous rowtypes
+select array[ row(1,2), row(3,4), row(5,6) ];
+           array           
+---------------------------
+ {"(1,2)","(3,4)","(5,6)"}
+(1 row)
+
+-- Check ability to compare an anonymous row to elements of an array
+select row(1,1.1) = any (array[ row(7,7.7), row(1,1.1), row(0,0.0) ]);
+ ?column? 
+----------
+ t
+(1 row)
+
+select row(1,1.1) = any (array[ row(7,7.7), row(1,1.0), row(0,0.0) ]);
+ ?column? 
+----------
+ f
+(1 row)
+
index fe7561065ed9f9de59e2f6f603ccc5cef30921c5..e8d3e43b7b60a2a667fab958a6a59f69a03b73a7 100644 (file)
@@ -451,6 +451,54 @@ SELECT t1.id, count(t2.*) FROM t AS t1 JOIN t AS t2 ON
   3 |     7
 (2 rows)
 
+--
+-- test cycle detection
+--
+create temp table graph( f int, t int, label text );
+insert into graph values
+       (1, 2, 'arc 1 -> 2'),
+       (1, 3, 'arc 1 -> 3'),
+       (2, 3, 'arc 2 -> 3'),
+       (1, 4, 'arc 1 -> 4'),
+       (4, 5, 'arc 4 -> 5'),
+       (5, 1, 'arc 5 -> 1');
+with recursive search_graph(f, t, label, path, cycle) as (
+       select *, array[row(g.f, g.t)], false from graph g
+       union all
+       select g.*, path || array[row(g.f, g.t)], row(g.f, g.t) = any(path)
+       from graph g, search_graph sg
+       where g.f = sg.t and not cycle
+)
+select * from search_graph;
+ f | t |   label    |                   path                    | cycle 
+---+---+------------+-------------------------------------------+-------
+ 1 | 2 | arc 1 -> 2 | {"(1,2)"}                                 | f
+ 1 | 3 | arc 1 -> 3 | {"(1,3)"}                                 | f
+ 2 | 3 | arc 2 -> 3 | {"(2,3)"}                                 | f
+ 1 | 4 | arc 1 -> 4 | {"(1,4)"}                                 | f
+ 4 | 5 | arc 4 -> 5 | {"(4,5)"}                                 | f
+ 5 | 1 | arc 5 -> 1 | {"(5,1)"}                                 | f
+ 1 | 2 | arc 1 -> 2 | {"(5,1)","(1,2)"}                         | f
+ 1 | 3 | arc 1 -> 3 | {"(5,1)","(1,3)"}                         | f
+ 1 | 4 | arc 1 -> 4 | {"(5,1)","(1,4)"}                         | f
+ 2 | 3 | arc 2 -> 3 | {"(1,2)","(2,3)"}                         | f
+ 4 | 5 | arc 4 -> 5 | {"(1,4)","(4,5)"}                         | f
+ 5 | 1 | arc 5 -> 1 | {"(4,5)","(5,1)"}                         | f
+ 1 | 2 | arc 1 -> 2 | {"(4,5)","(5,1)","(1,2)"}                 | f
+ 1 | 3 | arc 1 -> 3 | {"(4,5)","(5,1)","(1,3)"}                 | f
+ 1 | 4 | arc 1 -> 4 | {"(4,5)","(5,1)","(1,4)"}                 | f
+ 2 | 3 | arc 2 -> 3 | {"(5,1)","(1,2)","(2,3)"}                 | f
+ 4 | 5 | arc 4 -> 5 | {"(5,1)","(1,4)","(4,5)"}                 | f
+ 5 | 1 | arc 5 -> 1 | {"(1,4)","(4,5)","(5,1)"}                 | f
+ 1 | 2 | arc 1 -> 2 | {"(1,4)","(4,5)","(5,1)","(1,2)"}         | f
+ 1 | 3 | arc 1 -> 3 | {"(1,4)","(4,5)","(5,1)","(1,3)"}         | f
+ 1 | 4 | arc 1 -> 4 | {"(1,4)","(4,5)","(5,1)","(1,4)"}         | t
+ 2 | 3 | arc 2 -> 3 | {"(4,5)","(5,1)","(1,2)","(2,3)"}         | f
+ 4 | 5 | arc 4 -> 5 | {"(4,5)","(5,1)","(1,4)","(4,5)"}         | t
+ 5 | 1 | arc 5 -> 1 | {"(5,1)","(1,4)","(4,5)","(5,1)"}         | t
+ 2 | 3 | arc 2 -> 3 | {"(1,4)","(4,5)","(5,1)","(1,2)","(2,3)"} | f
+(25 rows)
+
 --
 -- test multiple WITH queries
 --
index 7e5c554d3b79b75cb7c3b890196766b478ea850a..a8520c5bd2c1ae40c8ca71753986b95f892b36b0 100644 (file)
@@ -113,3 +113,10 @@ order by thousand, tenthous;
 select ROW();
 select ROW() IS NULL;
 select ROW() = ROW();
+
+-- Check ability to create arrays of anonymous rowtypes
+select array[ row(1,2), row(3,4), row(5,6) ];
+
+-- Check ability to compare an anonymous row to elements of an array
+select row(1,1.1) = any (array[ row(7,7.7), row(1,1.1), row(0,0.0) ]);
+select row(1,1.1) = any (array[ row(7,7.7), row(1,1.0), row(0,0.0) ]);
index 54d311101d1cb187f63b48e0ecd5676067cf06a9..d37f0d9723e9d5db279bdf7065b4a5eb41f60741 100644 (file)
@@ -250,6 +250,28 @@ SELECT t1.id, count(t2.*) FROM t AS t1 JOIN t AS t2 ON
        GROUP BY t1.id
        ORDER BY t1.id;
 
+--
+-- test cycle detection
+--
+create temp table graph( f int, t int, label text );
+
+insert into graph values
+       (1, 2, 'arc 1 -> 2'),
+       (1, 3, 'arc 1 -> 3'),
+       (2, 3, 'arc 2 -> 3'),
+       (1, 4, 'arc 1 -> 4'),
+       (4, 5, 'arc 4 -> 5'),
+       (5, 1, 'arc 5 -> 1');
+
+with recursive search_graph(f, t, label, path, cycle) as (
+       select *, array[row(g.f, g.t)], false from graph g
+       union all
+       select g.*, path || array[row(g.f, g.t)], row(g.f, g.t) = any(path)
+       from graph g, search_graph sg
+       where g.f = sg.t and not cycle
+)
+select * from search_graph;
+
 --
 -- test multiple WITH queries
 --