]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/adt/rowtypes.c
Fix initialization of fake LSN for unlogged relations
[postgresql] / src / backend / utils / adt / rowtypes.c
index f08244216e87b566ed0aa2e0924c0cd138bba32c..ea3e40a369429d7b0f2f04fb0a5c062898b2bd65 100644 (file)
@@ -3,12 +3,12 @@
  * rowtypes.c
  *       I/O and comparison functions for generic composite types.
  *
- * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/adt/rowtypes.c,v 1.22 2008/10/13 16:25:19 tgl Exp $
+ *       src/backend/utils/adt/rowtypes.c
  *
  *-------------------------------------------------------------------------
  */
 
 #include <ctype.h>
 
+#include "access/detoast.h"
+#include "access/htup_details.h"
 #include "catalog/pg_type.h"
+#include "funcapi.h"
 #include "libpq/pqformat.h"
+#include "miscadmin.h"
 #include "utils/builtins.h"
+#include "utils/datum.h"
 #include "utils/lsyscache.h"
 #include "utils/typcache.h"
 
@@ -31,6 +36,7 @@ typedef struct ColumnIOData
        Oid                     column_type;
        Oid                     typiofunc;
        Oid                     typioparam;
+       bool            typisvarlena;
        FmgrInfo        proc;
 } ColumnIOData;
 
@@ -39,7 +45,7 @@ typedef struct RecordIOData
        Oid                     record_type;
        int32           record_typmod;
        int                     ncolumns;
-       ColumnIOData columns[1];        /* VARIABLE LENGTH ARRAY */
+       ColumnIOData columns[FLEXIBLE_ARRAY_MEMBER];
 } RecordIOData;
 
 /*
@@ -57,7 +63,7 @@ typedef struct RecordCompareData
        int32           record1_typmod;
        Oid                     record2_type;
        int32           record2_typmod;
-       ColumnCompareData columns[1];   /* VARIABLE LENGTH ARRAY */
+       ColumnCompareData columns[FLEXIBLE_ARRAY_MEMBER];
 } RecordCompareData;
 
 
@@ -69,12 +75,8 @@ record_in(PG_FUNCTION_ARGS)
 {
        char       *string = PG_GETARG_CSTRING(0);
        Oid                     tupType = PG_GETARG_OID(1);
-
-#ifdef NOT_USED
-       int32           typmod = PG_GETARG_INT32(2);
-#endif
+       int32           tupTypmod = PG_GETARG_INT32(2);
        HeapTupleHeader result;
-       int32           tupTypmod;
        TupleDesc       tupdesc;
        HeapTuple       tuple;
        RecordIOData *my_extra;
@@ -83,20 +85,29 @@ record_in(PG_FUNCTION_ARGS)
        int                     i;
        char       *ptr;
        Datum      *values;
-       char       *nulls;
+       bool       *nulls;
        StringInfoData buf;
 
+       check_stack_depth();            /* recurses for record-type columns */
+
        /*
-        * Use the passed type unless it's RECORD; we can't support input of
-        * anonymous types, mainly because there's no good way to figure out which
-        * anonymous type is wanted.  Note that for RECORD, what we'll probably
-        * actually get is RECORD's typelem, ie, zero.
+        * Give a friendly error message if we did not get enough info to identify
+        * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
+        * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
+        * for typmod, since composite types and RECORD have no type modifiers at
+        * the SQL level, and thus must fail for RECORD.  However some callers can
+        * supply a valid typmod, and then we can do something useful for RECORD.
         */
-       if (tupType == InvalidOid || tupType == RECORDOID)
+       if (tupType == RECORDOID && tupTypmod < 0)
                ereport(ERROR,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                  errmsg("input of anonymous composite types is not implemented")));
-       tupTypmod = -1;                         /* for all non-anonymous types */
+                                errmsg("input of anonymous composite types is not implemented")));
+
+       /*
+        * This comes from the composite type's pg_type.oid and stores system oids
+        * in user tables, specifically DatumTupleFields. This oid must be
+        * preserved by binary upgrades.
+        */
        tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
        ncolumns = tupdesc->natts;
 
@@ -110,8 +121,8 @@ record_in(PG_FUNCTION_ARGS)
        {
                fcinfo->flinfo->fn_extra =
                        MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
-                                                          sizeof(RecordIOData) - sizeof(ColumnIOData)
-                                                          ncolumns * sizeof(ColumnIOData));
+                                                          offsetof(RecordIOData, columns) +
+                                                          ncolumns * sizeof(ColumnIOData));
                my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
                my_extra->record_type = InvalidOid;
                my_extra->record_typmod = 0;
@@ -121,15 +132,15 @@ record_in(PG_FUNCTION_ARGS)
                my_extra->record_typmod != tupTypmod)
        {
                MemSet(my_extra, 0,
-                          sizeof(RecordIOData) - sizeof(ColumnIOData)
-                          ncolumns * sizeof(ColumnIOData));
+                          offsetof(RecordIOData, columns) +
+                          ncolumns * sizeof(ColumnIOData));
                my_extra->record_type = tupType;
                my_extra->record_typmod = tupTypmod;
                my_extra->ncolumns = ncolumns;
        }
 
        values = (Datum *) palloc(ncolumns * sizeof(Datum));
-       nulls = (char *) palloc(ncolumns * sizeof(char));
+       nulls = (bool *) palloc(ncolumns * sizeof(bool));
 
        /*
         * Scan the string.  We use "buf" to accumulate the de-quoted data for
@@ -149,15 +160,16 @@ record_in(PG_FUNCTION_ARGS)
 
        for (i = 0; i < ncolumns; i++)
        {
+               Form_pg_attribute att = TupleDescAttr(tupdesc, i);
                ColumnIOData *column_info = &my_extra->columns[i];
-               Oid                     column_type = tupdesc->attrs[i]->atttypid;
+               Oid                     column_type = att->atttypid;
                char       *column_data;
 
                /* Ignore dropped columns in datatype, but fill with nulls */
-               if (tupdesc->attrs[i]->attisdropped)
+               if (att->attisdropped)
                {
                        values[i] = (Datum) 0;
-                       nulls[i] = 'n';
+                       nulls[i] = true;
                        continue;
                }
 
@@ -178,7 +190,7 @@ record_in(PG_FUNCTION_ARGS)
                if (*ptr == ',' || *ptr == ')')
                {
                        column_data = NULL;
-                       nulls[i] = 'n';
+                       nulls[i] = true;
                }
                else
                {
@@ -206,11 +218,11 @@ record_in(PG_FUNCTION_ARGS)
                                                                 errdetail("Unexpected end of input.")));
                                        appendStringInfoChar(&buf, *ptr++);
                                }
-                               else if (ch == '\"')
+                               else if (ch == '"')
                                {
                                        if (!inquote)
                                                inquote = true;
-                                       else if (*ptr == '\"')
+                                       else if (*ptr == '"')
                                        {
                                                /* doubled quote within quote sequence */
                                                appendStringInfoChar(&buf, *ptr++);
@@ -223,7 +235,7 @@ record_in(PG_FUNCTION_ARGS)
                        }
 
                        column_data = buf.data;
-                       nulls[i] = ' ';
+                       nulls[i] = false;
                }
 
                /*
@@ -242,7 +254,7 @@ record_in(PG_FUNCTION_ARGS)
                values[i] = InputFunctionCall(&column_info->proc,
                                                                          column_data,
                                                                          column_info->typioparam,
-                                                                         tupdesc->attrs[i]->atttypmod);
+                                                                         att->atttypmod);
 
                /*
                 * Prep for next column
@@ -264,12 +276,12 @@ record_in(PG_FUNCTION_ARGS)
                                 errmsg("malformed record literal: \"%s\"", string),
                                 errdetail("Junk after right parenthesis.")));
 
-       tuple = heap_formtuple(tupdesc, values, nulls);
+       tuple = heap_form_tuple(tupdesc, values, nulls);
 
        /*
-        * We cannot return tuple->t_data because heap_formtuple allocates it as
+        * We cannot return tuple->t_data because heap_form_tuple allocates it as
         * part of a larger chunk, and our caller may expect to be able to pfree
-        * our result.  So must copy the info into a new palloc chunk.
+        * our result.  So must copy the info into a new palloc chunk.
         */
        result = (HeapTupleHeader) palloc(tuple->t_len);
        memcpy(result, tuple->t_data, tuple->t_len);
@@ -299,9 +311,11 @@ record_out(PG_FUNCTION_ARGS)
        int                     ncolumns;
        int                     i;
        Datum      *values;
-       char       *nulls;
+       bool       *nulls;
        StringInfoData buf;
 
+       check_stack_depth();            /* recurses for record-type columns */
+
        /* Extract type info from the tuple itself */
        tupType = HeapTupleHeaderGetTypeId(rec);
        tupTypmod = HeapTupleHeaderGetTypMod(rec);
@@ -324,8 +338,8 @@ record_out(PG_FUNCTION_ARGS)
        {
                fcinfo->flinfo->fn_extra =
                        MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
-                                                          sizeof(RecordIOData) - sizeof(ColumnIOData)
-                                                          ncolumns * sizeof(ColumnIOData));
+                                                          offsetof(RecordIOData, columns) +
+                                                          ncolumns * sizeof(ColumnIOData));
                my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
                my_extra->record_type = InvalidOid;
                my_extra->record_typmod = 0;
@@ -335,18 +349,18 @@ record_out(PG_FUNCTION_ARGS)
                my_extra->record_typmod != tupTypmod)
        {
                MemSet(my_extra, 0,
-                          sizeof(RecordIOData) - sizeof(ColumnIOData)
-                          ncolumns * sizeof(ColumnIOData));
+                          offsetof(RecordIOData, columns) +
+                          ncolumns * sizeof(ColumnIOData));
                my_extra->record_type = tupType;
                my_extra->record_typmod = tupTypmod;
                my_extra->ncolumns = ncolumns;
        }
 
        values = (Datum *) palloc(ncolumns * sizeof(Datum));
-       nulls = (char *) palloc(ncolumns * sizeof(char));
+       nulls = (bool *) palloc(ncolumns * sizeof(bool));
 
        /* Break down the tuple into fields */
-       heap_deformtuple(&tuple, tupdesc, values, nulls);
+       heap_deform_tuple(&tuple, tupdesc, values, nulls);
 
        /* And build the result string */
        initStringInfo(&buf);
@@ -355,21 +369,23 @@ record_out(PG_FUNCTION_ARGS)
 
        for (i = 0; i < ncolumns; i++)
        {
+               Form_pg_attribute att = TupleDescAttr(tupdesc, i);
                ColumnIOData *column_info = &my_extra->columns[i];
-               Oid                     column_type = tupdesc->attrs[i]->atttypid;
+               Oid                     column_type = att->atttypid;
+               Datum           attr;
                char       *value;
                char       *tmp;
                bool            nq;
 
                /* Ignore dropped columns in datatype */
-               if (tupdesc->attrs[i]->attisdropped)
+               if (att->attisdropped)
                        continue;
 
                if (needComma)
                        appendStringInfoChar(&buf, ',');
                needComma = true;
 
-               if (nulls[i] == 'n')
+               if (nulls[i])
                {
                        /* emit nothing... */
                        continue;
@@ -380,17 +396,16 @@ record_out(PG_FUNCTION_ARGS)
                 */
                if (column_info->column_type != column_type)
                {
-                       bool            typIsVarlena;
-
                        getTypeOutputInfo(column_type,
                                                          &column_info->typiofunc,
-                                                         &typIsVarlena);
+                                                         &column_info->typisvarlena);
                        fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
                                                  fcinfo->flinfo->fn_mcxt);
                        column_info->column_type = column_type;
                }
 
-               value = OutputFunctionCall(&column_info->proc, values[i]);
+               attr = values[i];
+               value = OutputFunctionCall(&column_info->proc, attr);
 
                /* Detect whether we need double quotes for this value */
                nq = (value[0] == '\0');        /* force quotes for empty string */
@@ -409,17 +424,17 @@ record_out(PG_FUNCTION_ARGS)
 
                /* And emit the string */
                if (nq)
-                       appendStringInfoChar(&buf, '"');
+                       appendStringInfoCharMacro(&buf, '"');
                for (tmp = value; *tmp; tmp++)
                {
                        char            ch = *tmp;
 
                        if (ch == '"' || ch == '\\')
-                               appendStringInfoChar(&buf, ch);
-                       appendStringInfoChar(&buf, ch);
+                               appendStringInfoCharMacro(&buf, ch);
+                       appendStringInfoCharMacro(&buf, ch);
                }
                if (nq)
-                       appendStringInfoChar(&buf, '"');
+                       appendStringInfoCharMacro(&buf, '"');
        }
 
        appendStringInfoChar(&buf, ')');
@@ -439,12 +454,8 @@ record_recv(PG_FUNCTION_ARGS)
 {
        StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
        Oid                     tupType = PG_GETARG_OID(1);
-
-#ifdef NOT_USED
-       int32           typmod = PG_GETARG_INT32(2);
-#endif
+       int32           tupTypmod = PG_GETARG_INT32(2);
        HeapTupleHeader result;
-       int32           tupTypmod;
        TupleDesc       tupdesc;
        HeapTuple       tuple;
        RecordIOData *my_extra;
@@ -453,19 +464,23 @@ record_recv(PG_FUNCTION_ARGS)
        int                     validcols;
        int                     i;
        Datum      *values;
-       char       *nulls;
+       bool       *nulls;
+
+       check_stack_depth();            /* recurses for record-type columns */
 
        /*
-        * Use the passed type unless it's RECORD; we can't support input of
-        * anonymous types, mainly because there's no good way to figure out which
-        * anonymous type is wanted.  Note that for RECORD, what we'll probably
-        * actually get is RECORD's typelem, ie, zero.
+        * Give a friendly error message if we did not get enough info to identify
+        * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
+        * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
+        * for typmod, since composite types and RECORD have no type modifiers at
+        * the SQL level, and thus must fail for RECORD.  However some callers can
+        * supply a valid typmod, and then we can do something useful for RECORD.
         */
-       if (tupType == InvalidOid || tupType == RECORDOID)
+       if (tupType == RECORDOID && tupTypmod < 0)
                ereport(ERROR,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                  errmsg("input of anonymous composite types is not implemented")));
-       tupTypmod = -1;                         /* for all non-anonymous types */
+                                errmsg("input of anonymous composite types is not implemented")));
+
        tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
        ncolumns = tupdesc->natts;
 
@@ -479,8 +494,8 @@ record_recv(PG_FUNCTION_ARGS)
        {
                fcinfo->flinfo->fn_extra =
                        MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
-                                                          sizeof(RecordIOData) - sizeof(ColumnIOData)
-                                                          ncolumns * sizeof(ColumnIOData));
+                                                          offsetof(RecordIOData, columns) +
+                                                          ncolumns * sizeof(ColumnIOData));
                my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
                my_extra->record_type = InvalidOid;
                my_extra->record_typmod = 0;
@@ -490,15 +505,15 @@ record_recv(PG_FUNCTION_ARGS)
                my_extra->record_typmod != tupTypmod)
        {
                MemSet(my_extra, 0,
-                          sizeof(RecordIOData) - sizeof(ColumnIOData)
-                          ncolumns * sizeof(ColumnIOData));
+                          offsetof(RecordIOData, columns) +
+                          ncolumns * sizeof(ColumnIOData));
                my_extra->record_type = tupType;
                my_extra->record_typmod = tupTypmod;
                my_extra->ncolumns = ncolumns;
        }
 
        values = (Datum *) palloc(ncolumns * sizeof(Datum));
-       nulls = (char *) palloc(ncolumns * sizeof(char));
+       nulls = (bool *) palloc(ncolumns * sizeof(bool));
 
        /* Fetch number of columns user thinks it has */
        usercols = pq_getmsgint(buf, 4);
@@ -507,7 +522,7 @@ record_recv(PG_FUNCTION_ARGS)
        validcols = 0;
        for (i = 0; i < ncolumns; i++)
        {
-               if (!tupdesc->attrs[i]->attisdropped)
+               if (!TupleDescAttr(tupdesc, i)->attisdropped)
                        validcols++;
        }
        if (usercols != validcols)
@@ -519,8 +534,9 @@ record_recv(PG_FUNCTION_ARGS)
        /* Process each column */
        for (i = 0; i < ncolumns; i++)
        {
+               Form_pg_attribute att = TupleDescAttr(tupdesc, i);
                ColumnIOData *column_info = &my_extra->columns[i];
-               Oid                     column_type = tupdesc->attrs[i]->atttypid;
+               Oid                     column_type = att->atttypid;
                Oid                     coltypoid;
                int                     itemlen;
                StringInfoData item_buf;
@@ -528,10 +544,10 @@ record_recv(PG_FUNCTION_ARGS)
                char            csave;
 
                /* Ignore dropped columns in datatype, but fill with nulls */
-               if (tupdesc->attrs[i]->attisdropped)
+               if (att->attisdropped)
                {
                        values[i] = (Datum) 0;
-                       nulls[i] = 'n';
+                       nulls[i] = true;
                        continue;
                }
 
@@ -554,7 +570,7 @@ record_recv(PG_FUNCTION_ARGS)
                {
                        /* -1 length means NULL */
                        bufptr = NULL;
-                       nulls[i] = 'n';
+                       nulls[i] = true;
                        csave = 0;                      /* keep compiler quiet */
                }
                else
@@ -576,7 +592,7 @@ record_recv(PG_FUNCTION_ARGS)
                        buf->data[buf->cursor] = '\0';
 
                        bufptr = &item_buf;
-                       nulls[i] = ' ';
+                       nulls[i] = false;
                }
 
                /* Now call the column's receiveproc */
@@ -593,7 +609,7 @@ record_recv(PG_FUNCTION_ARGS)
                values[i] = ReceiveFunctionCall(&column_info->proc,
                                                                                bufptr,
                                                                                column_info->typioparam,
-                                                                               tupdesc->attrs[i]->atttypmod);
+                                                                               att->atttypmod);
 
                if (bufptr)
                {
@@ -608,12 +624,12 @@ record_recv(PG_FUNCTION_ARGS)
                }
        }
 
-       tuple = heap_formtuple(tupdesc, values, nulls);
+       tuple = heap_form_tuple(tupdesc, values, nulls);
 
        /*
-        * We cannot return tuple->t_data because heap_formtuple allocates it as
+        * We cannot return tuple->t_data because heap_form_tuple allocates it as
         * part of a larger chunk, and our caller may expect to be able to pfree
-        * our result.  So must copy the info into a new palloc chunk.
+        * our result.  So must copy the info into a new palloc chunk.
         */
        result = (HeapTupleHeader) palloc(tuple->t_len);
        memcpy(result, tuple->t_data, tuple->t_len);
@@ -642,9 +658,11 @@ record_send(PG_FUNCTION_ARGS)
        int                     validcols;
        int                     i;
        Datum      *values;
-       char       *nulls;
+       bool       *nulls;
        StringInfoData buf;
 
+       check_stack_depth();            /* recurses for record-type columns */
+
        /* Extract type info from the tuple itself */
        tupType = HeapTupleHeaderGetTypeId(rec);
        tupTypmod = HeapTupleHeaderGetTypMod(rec);
@@ -667,8 +685,8 @@ record_send(PG_FUNCTION_ARGS)
        {
                fcinfo->flinfo->fn_extra =
                        MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
-                                                          sizeof(RecordIOData) - sizeof(ColumnIOData)
-                                                          ncolumns * sizeof(ColumnIOData));
+                                                          offsetof(RecordIOData, columns) +
+                                                          ncolumns * sizeof(ColumnIOData));
                my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
                my_extra->record_type = InvalidOid;
                my_extra->record_typmod = 0;
@@ -678,18 +696,18 @@ record_send(PG_FUNCTION_ARGS)
                my_extra->record_typmod != tupTypmod)
        {
                MemSet(my_extra, 0,
-                          sizeof(RecordIOData) - sizeof(ColumnIOData)
-                          ncolumns * sizeof(ColumnIOData));
+                          offsetof(RecordIOData, columns) +
+                          ncolumns * sizeof(ColumnIOData));
                my_extra->record_type = tupType;
                my_extra->record_typmod = tupTypmod;
                my_extra->ncolumns = ncolumns;
        }
 
        values = (Datum *) palloc(ncolumns * sizeof(Datum));
-       nulls = (char *) palloc(ncolumns * sizeof(char));
+       nulls = (bool *) palloc(ncolumns * sizeof(bool));
 
        /* Break down the tuple into fields */
-       heap_deformtuple(&tuple, tupdesc, values, nulls);
+       heap_deform_tuple(&tuple, tupdesc, values, nulls);
 
        /* And build the result string */
        pq_begintypsend(&buf);
@@ -698,27 +716,29 @@ record_send(PG_FUNCTION_ARGS)
        validcols = 0;
        for (i = 0; i < ncolumns; i++)
        {
-               if (!tupdesc->attrs[i]->attisdropped)
+               if (!TupleDescAttr(tupdesc, i)->attisdropped)
                        validcols++;
        }
-       pq_sendint(&buf, validcols, 4);
+       pq_sendint32(&buf, validcols);
 
        for (i = 0; i < ncolumns; i++)
        {
+               Form_pg_attribute att = TupleDescAttr(tupdesc, i);
                ColumnIOData *column_info = &my_extra->columns[i];
-               Oid                     column_type = tupdesc->attrs[i]->atttypid;
+               Oid                     column_type = att->atttypid;
+               Datum           attr;
                bytea      *outputbytes;
 
                /* Ignore dropped columns in datatype */
-               if (tupdesc->attrs[i]->attisdropped)
+               if (att->attisdropped)
                        continue;
 
-               pq_sendint(&buf, column_type, sizeof(Oid));
+               pq_sendint32(&buf, column_type);
 
-               if (nulls[i] == 'n')
+               if (nulls[i])
                {
                        /* emit -1 data length to signify a NULL */
-                       pq_sendint(&buf, -1, 4);
+                       pq_sendint32(&buf, -1);
                        continue;
                }
 
@@ -727,23 +747,19 @@ record_send(PG_FUNCTION_ARGS)
                 */
                if (column_info->column_type != column_type)
                {
-                       bool            typIsVarlena;
-
                        getTypeBinaryOutputInfo(column_type,
                                                                        &column_info->typiofunc,
-                                                                       &typIsVarlena);
+                                                                       &column_info->typisvarlena);
                        fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
                                                  fcinfo->flinfo->fn_mcxt);
                        column_info->column_type = column_type;
                }
 
-               outputbytes = SendFunctionCall(&column_info->proc, values[i]);
-
-               /* We assume the result will not have been toasted */
-               pq_sendint(&buf, VARSIZE(outputbytes) - VARHDRSZ, 4);
+               attr = values[i];
+               outputbytes = SendFunctionCall(&column_info->proc, attr);
+               pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
                pq_sendbytes(&buf, VARDATA(outputbytes),
                                         VARSIZE(outputbytes) - VARHDRSZ);
-               pfree(outputbytes);
        }
 
        pfree(values);
@@ -791,6 +807,8 @@ record_cmp(FunctionCallInfo fcinfo)
        int                     i2;
        int                     j;
 
+       check_stack_depth();            /* recurses for record-type columns */
+
        /* Extract type info from the tuples */
        tupType1 = HeapTupleHeaderGetTypeId(record1);
        tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
@@ -822,8 +840,8 @@ record_cmp(FunctionCallInfo fcinfo)
        {
                fcinfo->flinfo->fn_extra =
                        MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
-                                                          sizeof(RecordCompareData) - sizeof(ColumnCompareData)
-                                                          ncols * sizeof(ColumnCompareData));
+                                                          offsetof(RecordCompareData, columns) +
+                                                          ncols * sizeof(ColumnCompareData));
                my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
                my_extra->ncolumns = ncols;
                my_extra->record1_type = InvalidOid;
@@ -854,25 +872,26 @@ record_cmp(FunctionCallInfo fcinfo)
 
        /*
         * Scan corresponding columns, allowing for dropped columns in different
-        * places in the two rows.  i1 and i2 are physical column indexes,
-        * j is the logical column index.
+        * places in the two rows.  i1 and i2 are physical column indexes, j is
+        * the logical column index.
         */
        i1 = i2 = j = 0;
        while (i1 < ncolumns1 || i2 < ncolumns2)
        {
+               Form_pg_attribute att1;
+               Form_pg_attribute att2;
                TypeCacheEntry *typentry;
-               FunctionCallInfoData locfcinfo;
-               int32           cmpresult;
+               Oid                     collation;
 
                /*
                 * Skip dropped columns
                 */
-               if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped)
+               if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
                {
                        i1++;
                        continue;
                }
-               if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped)
+               if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
                {
                        i2++;
                        continue;
@@ -880,26 +899,36 @@ record_cmp(FunctionCallInfo fcinfo)
                if (i1 >= ncolumns1 || i2 >= ncolumns2)
                        break;                          /* we'll deal with mismatch below loop */
 
+               att1 = TupleDescAttr(tupdesc1, i1);
+               att2 = TupleDescAttr(tupdesc2, i2);
+
                /*
                 * Have two matching columns, they must be same type
                 */
-               if (tupdesc1->attrs[i1]->atttypid !=
-                       tupdesc2->attrs[i2]->atttypid)
+               if (att1->atttypid != att2->atttypid)
                        ereport(ERROR,
                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
                                         errmsg("cannot compare dissimilar column types %s and %s at record column %d",
-                                                       format_type_be(tupdesc1->attrs[i1]->atttypid),
-                                                       format_type_be(tupdesc2->attrs[i2]->atttypid),
-                                                       j+1)));
+                                                       format_type_be(att1->atttypid),
+                                                       format_type_be(att2->atttypid),
+                                                       j + 1)));
+
+               /*
+                * If they're not same collation, we don't complain here, but the
+                * comparison function might.
+                */
+               collation = att1->attcollation;
+               if (collation != att2->attcollation)
+                       collation = InvalidOid;
 
                /*
                 * Lookup the comparison function if not done already
                 */
                typentry = my_extra->columns[j].typentry;
                if (typentry == NULL ||
-                       typentry->type_id != tupdesc1->attrs[i1]->atttypid)
+                       typentry->type_id != att1->atttypid)
                {
-                       typentry = lookup_type_cache(tupdesc1->attrs[i1]->atttypid,
+                       typentry = lookup_type_cache(att1->atttypid,
                                                                                 TYPECACHE_CMP_PROC_FINFO);
                        if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
                                ereport(ERROR,
@@ -914,6 +943,9 @@ record_cmp(FunctionCallInfo fcinfo)
                 */
                if (!nulls1[i1] || !nulls2[i2])
                {
+                       LOCAL_FCINFO(locfcinfo, 2);
+                       int32           cmpresult;
+
                        if (nulls1[i1])
                        {
                                /* arg1 is greater than arg2 */
@@ -928,14 +960,14 @@ record_cmp(FunctionCallInfo fcinfo)
                        }
 
                        /* Compare the pair of elements */
-                       InitFunctionCallInfoData(locfcinfo, &typentry->cmp_proc_finfo, 2,
-                                                                        NULL, NULL);
-                       locfcinfo.arg[0] = values1[i1];
-                       locfcinfo.arg[1] = values2[i2];
-                       locfcinfo.argnull[0] = false;
-                       locfcinfo.argnull[1] = false;
-                       locfcinfo.isnull = false;
-                       cmpresult = DatumGetInt32(FunctionCallInvoke(&locfcinfo));
+                       InitFunctionCallInfoData(*locfcinfo, &typentry->cmp_proc_finfo, 2,
+                                                                        collation, NULL, NULL);
+                       locfcinfo->args[0].value = values1[i1];
+                       locfcinfo->args[0].isnull = false;
+                       locfcinfo->args[1].value = values2[i2];
+                       locfcinfo->args[1].isnull = false;
+                       locfcinfo->isnull = false;
+                       cmpresult = DatumGetInt32(FunctionCallInvoke(locfcinfo));
 
                        if (cmpresult < 0)
                        {
@@ -957,8 +989,8 @@ record_cmp(FunctionCallInfo fcinfo)
 
        /*
         * If we didn't break out of the loop early, check for column count
-        * mismatch.  (We do not report such mismatch if we found unequal
-        * column values; is that a feature or a bug?)
+        * mismatch.  (We do not report such mismatch if we found unequal column
+        * values; is that a feature or a bug?)
         */
        if (result == 0)
        {
@@ -1017,6 +1049,8 @@ record_eq(PG_FUNCTION_ARGS)
        int                     i2;
        int                     j;
 
+       check_stack_depth();            /* recurses for record-type columns */
+
        /* Extract type info from the tuples */
        tupType1 = HeapTupleHeaderGetTypeId(record1);
        tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
@@ -1048,8 +1082,8 @@ record_eq(PG_FUNCTION_ARGS)
        {
                fcinfo->flinfo->fn_extra =
                        MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
-                                                          sizeof(RecordCompareData) - sizeof(ColumnCompareData)
-                                                          ncols * sizeof(ColumnCompareData));
+                                                          offsetof(RecordCompareData, columns) +
+                                                          ncols * sizeof(ColumnCompareData));
                my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
                my_extra->ncolumns = ncols;
                my_extra->record1_type = InvalidOid;
@@ -1080,25 +1114,28 @@ record_eq(PG_FUNCTION_ARGS)
 
        /*
         * Scan corresponding columns, allowing for dropped columns in different
-        * places in the two rows.  i1 and i2 are physical column indexes,
-        * j is the logical column index.
+        * places in the two rows.  i1 and i2 are physical column indexes, j is
+        * the logical column index.
         */
        i1 = i2 = j = 0;
        while (i1 < ncolumns1 || i2 < ncolumns2)
        {
+               LOCAL_FCINFO(locfcinfo, 2);
+               Form_pg_attribute att1;
+               Form_pg_attribute att2;
                TypeCacheEntry *typentry;
-               FunctionCallInfoData locfcinfo;
+               Oid                     collation;
                bool            oprresult;
 
                /*
                 * Skip dropped columns
                 */
-               if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped)
+               if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
                {
                        i1++;
                        continue;
                }
-               if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped)
+               if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
                {
                        i2++;
                        continue;
@@ -1106,26 +1143,36 @@ record_eq(PG_FUNCTION_ARGS)
                if (i1 >= ncolumns1 || i2 >= ncolumns2)
                        break;                          /* we'll deal with mismatch below loop */
 
+               att1 = TupleDescAttr(tupdesc1, i1);
+               att2 = TupleDescAttr(tupdesc2, i2);
+
                /*
                 * Have two matching columns, they must be same type
                 */
-               if (tupdesc1->attrs[i1]->atttypid !=
-                       tupdesc2->attrs[i2]->atttypid)
+               if (att1->atttypid != att2->atttypid)
                        ereport(ERROR,
                                        (errcode(ERRCODE_DATATYPE_MISMATCH),
                                         errmsg("cannot compare dissimilar column types %s and %s at record column %d",
-                                                       format_type_be(tupdesc1->attrs[i1]->atttypid),
-                                                       format_type_be(tupdesc2->attrs[i2]->atttypid),
-                                                       j+1)));
+                                                       format_type_be(att1->atttypid),
+                                                       format_type_be(att2->atttypid),
+                                                       j + 1)));
+
+               /*
+                * If they're not same collation, we don't complain here, but the
+                * equality function might.
+                */
+               collation = att1->attcollation;
+               if (collation != att2->attcollation)
+                       collation = InvalidOid;
 
                /*
                 * Lookup the equality function if not done already
                 */
                typentry = my_extra->columns[j].typentry;
                if (typentry == NULL ||
-                       typentry->type_id != tupdesc1->attrs[i1]->atttypid)
+                       typentry->type_id != att1->atttypid)
                {
-                       typentry = lookup_type_cache(tupdesc1->attrs[i1]->atttypid,
+                       typentry = lookup_type_cache(att1->atttypid,
                                                                                 TYPECACHE_EQ_OPR_FINFO);
                        if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
                                ereport(ERROR,
@@ -1147,14 +1194,14 @@ record_eq(PG_FUNCTION_ARGS)
                        }
 
                        /* Compare the pair of elements */
-                       InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
-                                                                        NULL, NULL);
-                       locfcinfo.arg[0] = values1[i1];
-                       locfcinfo.arg[1] = values2[i2];
-                       locfcinfo.argnull[0] = false;
-                       locfcinfo.argnull[1] = false;
-                       locfcinfo.isnull = false;
-                       oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo));
+                       InitFunctionCallInfoData(*locfcinfo, &typentry->eq_opr_finfo, 2,
+                                                                        collation, NULL, NULL);
+                       locfcinfo->args[0].value = values1[i1];
+                       locfcinfo->args[0].isnull = false;
+                       locfcinfo->args[1].value = values2[i2];
+                       locfcinfo->args[1].isnull = false;
+                       locfcinfo->isnull = false;
+                       oprresult = DatumGetBool(FunctionCallInvoke(locfcinfo));
                        if (!oprresult)
                        {
                                result = false;
@@ -1168,8 +1215,8 @@ record_eq(PG_FUNCTION_ARGS)
 
        /*
         * If we didn't break out of the loop early, check for column count
-        * mismatch.  (We do not report such mismatch if we found unequal
-        * column values; is that a feature or a bug?)
+        * mismatch.  (We do not report such mismatch if we found unequal column
+        * values; is that a feature or a bug?)
         */
        if (result)
        {
@@ -1228,3 +1275,471 @@ btrecordcmp(PG_FUNCTION_ARGS)
 {
        PG_RETURN_INT32(record_cmp(fcinfo));
 }
+
+
+/*
+ * record_image_cmp :
+ * Internal byte-oriented comparison function for records.
+ *
+ * Returns -1, 0 or 1
+ *
+ * Note: The normal concepts of "equality" do not apply here; different
+ * representation of values considered to be equal are not considered to be
+ * identical.  As an example, for the citext type 'A' and 'a' are equal, but
+ * they are not identical.
+ */
+static int
+record_image_cmp(FunctionCallInfo fcinfo)
+{
+       HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
+       HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
+       int                     result = 0;
+       Oid                     tupType1;
+       Oid                     tupType2;
+       int32           tupTypmod1;
+       int32           tupTypmod2;
+       TupleDesc       tupdesc1;
+       TupleDesc       tupdesc2;
+       HeapTupleData tuple1;
+       HeapTupleData tuple2;
+       int                     ncolumns1;
+       int                     ncolumns2;
+       RecordCompareData *my_extra;
+       int                     ncols;
+       Datum      *values1;
+       Datum      *values2;
+       bool       *nulls1;
+       bool       *nulls2;
+       int                     i1;
+       int                     i2;
+       int                     j;
+
+       /* Extract type info from the tuples */
+       tupType1 = HeapTupleHeaderGetTypeId(record1);
+       tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
+       tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
+       ncolumns1 = tupdesc1->natts;
+       tupType2 = HeapTupleHeaderGetTypeId(record2);
+       tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
+       tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
+       ncolumns2 = tupdesc2->natts;
+
+       /* Build temporary HeapTuple control structures */
+       tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
+       ItemPointerSetInvalid(&(tuple1.t_self));
+       tuple1.t_tableOid = InvalidOid;
+       tuple1.t_data = record1;
+       tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
+       ItemPointerSetInvalid(&(tuple2.t_self));
+       tuple2.t_tableOid = InvalidOid;
+       tuple2.t_data = record2;
+
+       /*
+        * We arrange to look up the needed comparison info just once per series
+        * of calls, assuming the record types don't change underneath us.
+        */
+       ncols = Max(ncolumns1, ncolumns2);
+       my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
+       if (my_extra == NULL ||
+               my_extra->ncolumns < ncols)
+       {
+               fcinfo->flinfo->fn_extra =
+                       MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+                                                          offsetof(RecordCompareData, columns) +
+                                                          ncols * sizeof(ColumnCompareData));
+               my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
+               my_extra->ncolumns = ncols;
+               my_extra->record1_type = InvalidOid;
+               my_extra->record1_typmod = 0;
+               my_extra->record2_type = InvalidOid;
+               my_extra->record2_typmod = 0;
+       }
+
+       if (my_extra->record1_type != tupType1 ||
+               my_extra->record1_typmod != tupTypmod1 ||
+               my_extra->record2_type != tupType2 ||
+               my_extra->record2_typmod != tupTypmod2)
+       {
+               MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
+               my_extra->record1_type = tupType1;
+               my_extra->record1_typmod = tupTypmod1;
+               my_extra->record2_type = tupType2;
+               my_extra->record2_typmod = tupTypmod2;
+       }
+
+       /* Break down the tuples into fields */
+       values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
+       nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
+       heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
+       values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
+       nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
+       heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
+
+       /*
+        * Scan corresponding columns, allowing for dropped columns in different
+        * places in the two rows.  i1 and i2 are physical column indexes, j is
+        * the logical column index.
+        */
+       i1 = i2 = j = 0;
+       while (i1 < ncolumns1 || i2 < ncolumns2)
+       {
+               Form_pg_attribute att1;
+               Form_pg_attribute att2;
+
+               /*
+                * Skip dropped columns
+                */
+               if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
+               {
+                       i1++;
+                       continue;
+               }
+               if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
+               {
+                       i2++;
+                       continue;
+               }
+               if (i1 >= ncolumns1 || i2 >= ncolumns2)
+                       break;                          /* we'll deal with mismatch below loop */
+
+               att1 = TupleDescAttr(tupdesc1, i1);
+               att2 = TupleDescAttr(tupdesc2, i2);
+
+               /*
+                * Have two matching columns, they must be same type
+                */
+               if (att1->atttypid != att2->atttypid)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("cannot compare dissimilar column types %s and %s at record column %d",
+                                                       format_type_be(att1->atttypid),
+                                                       format_type_be(att2->atttypid),
+                                                       j + 1)));
+
+               /*
+                * The same type should have the same length (or both should be
+                * variable).
+                */
+               Assert(att1->attlen == att2->attlen);
+
+               /*
+                * We consider two NULLs equal; NULL > not-NULL.
+                */
+               if (!nulls1[i1] || !nulls2[i2])
+               {
+                       int                     cmpresult = 0;
+
+                       if (nulls1[i1])
+                       {
+                               /* arg1 is greater than arg2 */
+                               result = 1;
+                               break;
+                       }
+                       if (nulls2[i2])
+                       {
+                               /* arg1 is less than arg2 */
+                               result = -1;
+                               break;
+                       }
+
+                       /* Compare the pair of elements */
+                       if (att1->attlen == -1)
+                       {
+                               Size            len1,
+                                                       len2;
+                               struct varlena *arg1val;
+                               struct varlena *arg2val;
+
+                               len1 = toast_raw_datum_size(values1[i1]);
+                               len2 = toast_raw_datum_size(values2[i2]);
+                               arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
+                               arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
+
+                               cmpresult = memcmp(VARDATA_ANY(arg1val),
+                                                                  VARDATA_ANY(arg2val),
+                                                                  Min(len1, len2) - VARHDRSZ);
+                               if ((cmpresult == 0) && (len1 != len2))
+                                       cmpresult = (len1 < len2) ? -1 : 1;
+
+                               if ((Pointer) arg1val != (Pointer) values1[i1])
+                                       pfree(arg1val);
+                               if ((Pointer) arg2val != (Pointer) values2[i2])
+                                       pfree(arg2val);
+                       }
+                       else if (att1->attbyval)
+                       {
+                               if (values1[i1] != values2[i2])
+                                       cmpresult = (values1[i1] < values2[i2]) ? -1 : 1;
+                       }
+                       else
+                       {
+                               cmpresult = memcmp(DatumGetPointer(values1[i1]),
+                                                                  DatumGetPointer(values2[i2]),
+                                                                  att1->attlen);
+                       }
+
+                       if (cmpresult < 0)
+                       {
+                               /* arg1 is less than arg2 */
+                               result = -1;
+                               break;
+                       }
+                       else if (cmpresult > 0)
+                       {
+                               /* arg1 is greater than arg2 */
+                               result = 1;
+                               break;
+                       }
+               }
+
+               /* equal, so continue to next column */
+               i1++, i2++, j++;
+       }
+
+       /*
+        * If we didn't break out of the loop early, check for column count
+        * mismatch.  (We do not report such mismatch if we found unequal column
+        * values; is that a feature or a bug?)
+        */
+       if (result == 0)
+       {
+               if (i1 != ncolumns1 || i2 != ncolumns2)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("cannot compare record types with different numbers of columns")));
+       }
+
+       pfree(values1);
+       pfree(nulls1);
+       pfree(values2);
+       pfree(nulls2);
+       ReleaseTupleDesc(tupdesc1);
+       ReleaseTupleDesc(tupdesc2);
+
+       /* Avoid leaking memory when handed toasted input. */
+       PG_FREE_IF_COPY(record1, 0);
+       PG_FREE_IF_COPY(record2, 1);
+
+       return result;
+}
+
+/*
+ * record_image_eq :
+ *               compares two records for identical contents, based on byte images
+ * result :
+ *               returns true if the records are identical, false otherwise.
+ *
+ * Note: we do not use record_image_cmp here, since we can avoid
+ * de-toasting for unequal lengths this way.
+ */
+Datum
+record_image_eq(PG_FUNCTION_ARGS)
+{
+       HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
+       HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
+       bool            result = true;
+       Oid                     tupType1;
+       Oid                     tupType2;
+       int32           tupTypmod1;
+       int32           tupTypmod2;
+       TupleDesc       tupdesc1;
+       TupleDesc       tupdesc2;
+       HeapTupleData tuple1;
+       HeapTupleData tuple2;
+       int                     ncolumns1;
+       int                     ncolumns2;
+       RecordCompareData *my_extra;
+       int                     ncols;
+       Datum      *values1;
+       Datum      *values2;
+       bool       *nulls1;
+       bool       *nulls2;
+       int                     i1;
+       int                     i2;
+       int                     j;
+
+       /* Extract type info from the tuples */
+       tupType1 = HeapTupleHeaderGetTypeId(record1);
+       tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
+       tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
+       ncolumns1 = tupdesc1->natts;
+       tupType2 = HeapTupleHeaderGetTypeId(record2);
+       tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
+       tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
+       ncolumns2 = tupdesc2->natts;
+
+       /* Build temporary HeapTuple control structures */
+       tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
+       ItemPointerSetInvalid(&(tuple1.t_self));
+       tuple1.t_tableOid = InvalidOid;
+       tuple1.t_data = record1;
+       tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
+       ItemPointerSetInvalid(&(tuple2.t_self));
+       tuple2.t_tableOid = InvalidOid;
+       tuple2.t_data = record2;
+
+       /*
+        * We arrange to look up the needed comparison info just once per series
+        * of calls, assuming the record types don't change underneath us.
+        */
+       ncols = Max(ncolumns1, ncolumns2);
+       my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
+       if (my_extra == NULL ||
+               my_extra->ncolumns < ncols)
+       {
+               fcinfo->flinfo->fn_extra =
+                       MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+                                                          offsetof(RecordCompareData, columns) +
+                                                          ncols * sizeof(ColumnCompareData));
+               my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
+               my_extra->ncolumns = ncols;
+               my_extra->record1_type = InvalidOid;
+               my_extra->record1_typmod = 0;
+               my_extra->record2_type = InvalidOid;
+               my_extra->record2_typmod = 0;
+       }
+
+       if (my_extra->record1_type != tupType1 ||
+               my_extra->record1_typmod != tupTypmod1 ||
+               my_extra->record2_type != tupType2 ||
+               my_extra->record2_typmod != tupTypmod2)
+       {
+               MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
+               my_extra->record1_type = tupType1;
+               my_extra->record1_typmod = tupTypmod1;
+               my_extra->record2_type = tupType2;
+               my_extra->record2_typmod = tupTypmod2;
+       }
+
+       /* Break down the tuples into fields */
+       values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
+       nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
+       heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
+       values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
+       nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
+       heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
+
+       /*
+        * Scan corresponding columns, allowing for dropped columns in different
+        * places in the two rows.  i1 and i2 are physical column indexes, j is
+        * the logical column index.
+        */
+       i1 = i2 = j = 0;
+       while (i1 < ncolumns1 || i2 < ncolumns2)
+       {
+               Form_pg_attribute att1;
+               Form_pg_attribute att2;
+
+               /*
+                * Skip dropped columns
+                */
+               if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
+               {
+                       i1++;
+                       continue;
+               }
+               if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
+               {
+                       i2++;
+                       continue;
+               }
+               if (i1 >= ncolumns1 || i2 >= ncolumns2)
+                       break;                          /* we'll deal with mismatch below loop */
+
+               att1 = TupleDescAttr(tupdesc1, i1);
+               att2 = TupleDescAttr(tupdesc2, i2);
+
+               /*
+                * Have two matching columns, they must be same type
+                */
+               if (att1->atttypid != att2->atttypid)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("cannot compare dissimilar column types %s and %s at record column %d",
+                                                       format_type_be(att1->atttypid),
+                                                       format_type_be(att2->atttypid),
+                                                       j + 1)));
+
+               /*
+                * We consider two NULLs equal; NULL > not-NULL.
+                */
+               if (!nulls1[i1] || !nulls2[i2])
+               {
+                       if (nulls1[i1] || nulls2[i2])
+                       {
+                               result = false;
+                               break;
+                       }
+
+                       /* Compare the pair of elements */
+                       result = datum_image_eq(values1[i1], values2[i2], att1->attbyval, att2->attlen);
+                       if (!result)
+                               break;
+               }
+
+               /* equal, so continue to next column */
+               i1++, i2++, j++;
+       }
+
+       /*
+        * If we didn't break out of the loop early, check for column count
+        * mismatch.  (We do not report such mismatch if we found unequal column
+        * values; is that a feature or a bug?)
+        */
+       if (result)
+       {
+               if (i1 != ncolumns1 || i2 != ncolumns2)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("cannot compare record types with different numbers of columns")));
+       }
+
+       pfree(values1);
+       pfree(nulls1);
+       pfree(values2);
+       pfree(nulls2);
+       ReleaseTupleDesc(tupdesc1);
+       ReleaseTupleDesc(tupdesc2);
+
+       /* Avoid leaking memory when handed toasted input. */
+       PG_FREE_IF_COPY(record1, 0);
+       PG_FREE_IF_COPY(record2, 1);
+
+       PG_RETURN_BOOL(result);
+}
+
+Datum
+record_image_ne(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(!DatumGetBool(record_image_eq(fcinfo)));
+}
+
+Datum
+record_image_lt(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(record_image_cmp(fcinfo) < 0);
+}
+
+Datum
+record_image_gt(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(record_image_cmp(fcinfo) > 0);
+}
+
+Datum
+record_image_le(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(record_image_cmp(fcinfo) <= 0);
+}
+
+Datum
+record_image_ge(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_BOOL(record_image_cmp(fcinfo) >= 0);
+}
+
+Datum
+btrecordimagecmp(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_INT32(record_image_cmp(fcinfo));
+}