]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/rowtypes.c
Split tuptoaster.c into three separate files.
[postgresql] / src / backend / utils / adt / rowtypes.c
1 /*-------------------------------------------------------------------------
2  *
3  * rowtypes.c
4  *        I/O and comparison functions for generic composite types.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/utils/adt/rowtypes.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <ctype.h>
18
19 #include "access/detoast.h"
20 #include "access/htup_details.h"
21 #include "catalog/pg_type.h"
22 #include "funcapi.h"
23 #include "libpq/pqformat.h"
24 #include "miscadmin.h"
25 #include "utils/builtins.h"
26 #include "utils/datum.h"
27 #include "utils/lsyscache.h"
28 #include "utils/typcache.h"
29
30
31 /*
32  * structure to cache metadata needed for record I/O
33  */
34 typedef struct ColumnIOData
35 {
36         Oid                     column_type;
37         Oid                     typiofunc;
38         Oid                     typioparam;
39         bool            typisvarlena;
40         FmgrInfo        proc;
41 } ColumnIOData;
42
43 typedef struct RecordIOData
44 {
45         Oid                     record_type;
46         int32           record_typmod;
47         int                     ncolumns;
48         ColumnIOData columns[FLEXIBLE_ARRAY_MEMBER];
49 } RecordIOData;
50
51 /*
52  * structure to cache metadata needed for record comparison
53  */
54 typedef struct ColumnCompareData
55 {
56         TypeCacheEntry *typentry;       /* has everything we need, actually */
57 } ColumnCompareData;
58
59 typedef struct RecordCompareData
60 {
61         int                     ncolumns;               /* allocated length of columns[] */
62         Oid                     record1_type;
63         int32           record1_typmod;
64         Oid                     record2_type;
65         int32           record2_typmod;
66         ColumnCompareData columns[FLEXIBLE_ARRAY_MEMBER];
67 } RecordCompareData;
68
69
70 /*
71  * record_in            - input routine for any composite type.
72  */
73 Datum
74 record_in(PG_FUNCTION_ARGS)
75 {
76         char       *string = PG_GETARG_CSTRING(0);
77         Oid                     tupType = PG_GETARG_OID(1);
78         int32           tupTypmod = PG_GETARG_INT32(2);
79         HeapTupleHeader result;
80         TupleDesc       tupdesc;
81         HeapTuple       tuple;
82         RecordIOData *my_extra;
83         bool            needComma = false;
84         int                     ncolumns;
85         int                     i;
86         char       *ptr;
87         Datum      *values;
88         bool       *nulls;
89         StringInfoData buf;
90
91         check_stack_depth();            /* recurses for record-type columns */
92
93         /*
94          * Give a friendly error message if we did not get enough info to identify
95          * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
96          * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
97          * for typmod, since composite types and RECORD have no type modifiers at
98          * the SQL level, and thus must fail for RECORD.  However some callers can
99          * supply a valid typmod, and then we can do something useful for RECORD.
100          */
101         if (tupType == RECORDOID && tupTypmod < 0)
102                 ereport(ERROR,
103                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
104                                  errmsg("input of anonymous composite types is not implemented")));
105
106         /*
107          * This comes from the composite type's pg_type.oid and stores system oids
108          * in user tables, specifically DatumTupleFields. This oid must be
109          * preserved by binary upgrades.
110          */
111         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
112         ncolumns = tupdesc->natts;
113
114         /*
115          * We arrange to look up the needed I/O info just once per series of
116          * calls, assuming the record type doesn't change underneath us.
117          */
118         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
119         if (my_extra == NULL ||
120                 my_extra->ncolumns != ncolumns)
121         {
122                 fcinfo->flinfo->fn_extra =
123                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
124                                                            offsetof(RecordIOData, columns) +
125                                                            ncolumns * sizeof(ColumnIOData));
126                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
127                 my_extra->record_type = InvalidOid;
128                 my_extra->record_typmod = 0;
129         }
130
131         if (my_extra->record_type != tupType ||
132                 my_extra->record_typmod != tupTypmod)
133         {
134                 MemSet(my_extra, 0,
135                            offsetof(RecordIOData, columns) +
136                            ncolumns * sizeof(ColumnIOData));
137                 my_extra->record_type = tupType;
138                 my_extra->record_typmod = tupTypmod;
139                 my_extra->ncolumns = ncolumns;
140         }
141
142         values = (Datum *) palloc(ncolumns * sizeof(Datum));
143         nulls = (bool *) palloc(ncolumns * sizeof(bool));
144
145         /*
146          * Scan the string.  We use "buf" to accumulate the de-quoted data for
147          * each column, which is then fed to the appropriate input converter.
148          */
149         ptr = string;
150         /* Allow leading whitespace */
151         while (*ptr && isspace((unsigned char) *ptr))
152                 ptr++;
153         if (*ptr++ != '(')
154                 ereport(ERROR,
155                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
156                                  errmsg("malformed record literal: \"%s\"", string),
157                                  errdetail("Missing left parenthesis.")));
158
159         initStringInfo(&buf);
160
161         for (i = 0; i < ncolumns; i++)
162         {
163                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
164                 ColumnIOData *column_info = &my_extra->columns[i];
165                 Oid                     column_type = att->atttypid;
166                 char       *column_data;
167
168                 /* Ignore dropped columns in datatype, but fill with nulls */
169                 if (att->attisdropped)
170                 {
171                         values[i] = (Datum) 0;
172                         nulls[i] = true;
173                         continue;
174                 }
175
176                 if (needComma)
177                 {
178                         /* Skip comma that separates prior field from this one */
179                         if (*ptr == ',')
180                                 ptr++;
181                         else
182                                 /* *ptr must be ')' */
183                                 ereport(ERROR,
184                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
185                                                  errmsg("malformed record literal: \"%s\"", string),
186                                                  errdetail("Too few columns.")));
187                 }
188
189                 /* Check for null: completely empty input means null */
190                 if (*ptr == ',' || *ptr == ')')
191                 {
192                         column_data = NULL;
193                         nulls[i] = true;
194                 }
195                 else
196                 {
197                         /* Extract string for this column */
198                         bool            inquote = false;
199
200                         resetStringInfo(&buf);
201                         while (inquote || !(*ptr == ',' || *ptr == ')'))
202                         {
203                                 char            ch = *ptr++;
204
205                                 if (ch == '\0')
206                                         ereport(ERROR,
207                                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
208                                                          errmsg("malformed record literal: \"%s\"",
209                                                                         string),
210                                                          errdetail("Unexpected end of input.")));
211                                 if (ch == '\\')
212                                 {
213                                         if (*ptr == '\0')
214                                                 ereport(ERROR,
215                                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
216                                                                  errmsg("malformed record literal: \"%s\"",
217                                                                                 string),
218                                                                  errdetail("Unexpected end of input.")));
219                                         appendStringInfoChar(&buf, *ptr++);
220                                 }
221                                 else if (ch == '"')
222                                 {
223                                         if (!inquote)
224                                                 inquote = true;
225                                         else if (*ptr == '"')
226                                         {
227                                                 /* doubled quote within quote sequence */
228                                                 appendStringInfoChar(&buf, *ptr++);
229                                         }
230                                         else
231                                                 inquote = false;
232                                 }
233                                 else
234                                         appendStringInfoChar(&buf, ch);
235                         }
236
237                         column_data = buf.data;
238                         nulls[i] = false;
239                 }
240
241                 /*
242                  * Convert the column value
243                  */
244                 if (column_info->column_type != column_type)
245                 {
246                         getTypeInputInfo(column_type,
247                                                          &column_info->typiofunc,
248                                                          &column_info->typioparam);
249                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
250                                                   fcinfo->flinfo->fn_mcxt);
251                         column_info->column_type = column_type;
252                 }
253
254                 values[i] = InputFunctionCall(&column_info->proc,
255                                                                           column_data,
256                                                                           column_info->typioparam,
257                                                                           att->atttypmod);
258
259                 /*
260                  * Prep for next column
261                  */
262                 needComma = true;
263         }
264
265         if (*ptr++ != ')')
266                 ereport(ERROR,
267                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
268                                  errmsg("malformed record literal: \"%s\"", string),
269                                  errdetail("Too many columns.")));
270         /* Allow trailing whitespace */
271         while (*ptr && isspace((unsigned char) *ptr))
272                 ptr++;
273         if (*ptr)
274                 ereport(ERROR,
275                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
276                                  errmsg("malformed record literal: \"%s\"", string),
277                                  errdetail("Junk after right parenthesis.")));
278
279         tuple = heap_form_tuple(tupdesc, values, nulls);
280
281         /*
282          * We cannot return tuple->t_data because heap_form_tuple allocates it as
283          * part of a larger chunk, and our caller may expect to be able to pfree
284          * our result.  So must copy the info into a new palloc chunk.
285          */
286         result = (HeapTupleHeader) palloc(tuple->t_len);
287         memcpy(result, tuple->t_data, tuple->t_len);
288
289         heap_freetuple(tuple);
290         pfree(buf.data);
291         pfree(values);
292         pfree(nulls);
293         ReleaseTupleDesc(tupdesc);
294
295         PG_RETURN_HEAPTUPLEHEADER(result);
296 }
297
298 /*
299  * record_out           - output routine for any composite type.
300  */
301 Datum
302 record_out(PG_FUNCTION_ARGS)
303 {
304         HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
305         Oid                     tupType;
306         int32           tupTypmod;
307         TupleDesc       tupdesc;
308         HeapTupleData tuple;
309         RecordIOData *my_extra;
310         bool            needComma = false;
311         int                     ncolumns;
312         int                     i;
313         Datum      *values;
314         bool       *nulls;
315         StringInfoData buf;
316
317         check_stack_depth();            /* recurses for record-type columns */
318
319         /* Extract type info from the tuple itself */
320         tupType = HeapTupleHeaderGetTypeId(rec);
321         tupTypmod = HeapTupleHeaderGetTypMod(rec);
322         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
323         ncolumns = tupdesc->natts;
324
325         /* Build a temporary HeapTuple control structure */
326         tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
327         ItemPointerSetInvalid(&(tuple.t_self));
328         tuple.t_tableOid = InvalidOid;
329         tuple.t_data = rec;
330
331         /*
332          * We arrange to look up the needed I/O info just once per series of
333          * calls, assuming the record type doesn't change underneath us.
334          */
335         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
336         if (my_extra == NULL ||
337                 my_extra->ncolumns != ncolumns)
338         {
339                 fcinfo->flinfo->fn_extra =
340                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
341                                                            offsetof(RecordIOData, columns) +
342                                                            ncolumns * sizeof(ColumnIOData));
343                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
344                 my_extra->record_type = InvalidOid;
345                 my_extra->record_typmod = 0;
346         }
347
348         if (my_extra->record_type != tupType ||
349                 my_extra->record_typmod != tupTypmod)
350         {
351                 MemSet(my_extra, 0,
352                            offsetof(RecordIOData, columns) +
353                            ncolumns * sizeof(ColumnIOData));
354                 my_extra->record_type = tupType;
355                 my_extra->record_typmod = tupTypmod;
356                 my_extra->ncolumns = ncolumns;
357         }
358
359         values = (Datum *) palloc(ncolumns * sizeof(Datum));
360         nulls = (bool *) palloc(ncolumns * sizeof(bool));
361
362         /* Break down the tuple into fields */
363         heap_deform_tuple(&tuple, tupdesc, values, nulls);
364
365         /* And build the result string */
366         initStringInfo(&buf);
367
368         appendStringInfoChar(&buf, '(');
369
370         for (i = 0; i < ncolumns; i++)
371         {
372                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
373                 ColumnIOData *column_info = &my_extra->columns[i];
374                 Oid                     column_type = att->atttypid;
375                 Datum           attr;
376                 char       *value;
377                 char       *tmp;
378                 bool            nq;
379
380                 /* Ignore dropped columns in datatype */
381                 if (att->attisdropped)
382                         continue;
383
384                 if (needComma)
385                         appendStringInfoChar(&buf, ',');
386                 needComma = true;
387
388                 if (nulls[i])
389                 {
390                         /* emit nothing... */
391                         continue;
392                 }
393
394                 /*
395                  * Convert the column value to text
396                  */
397                 if (column_info->column_type != column_type)
398                 {
399                         getTypeOutputInfo(column_type,
400                                                           &column_info->typiofunc,
401                                                           &column_info->typisvarlena);
402                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
403                                                   fcinfo->flinfo->fn_mcxt);
404                         column_info->column_type = column_type;
405                 }
406
407                 attr = values[i];
408                 value = OutputFunctionCall(&column_info->proc, attr);
409
410                 /* Detect whether we need double quotes for this value */
411                 nq = (value[0] == '\0');        /* force quotes for empty string */
412                 for (tmp = value; *tmp; tmp++)
413                 {
414                         char            ch = *tmp;
415
416                         if (ch == '"' || ch == '\\' ||
417                                 ch == '(' || ch == ')' || ch == ',' ||
418                                 isspace((unsigned char) ch))
419                         {
420                                 nq = true;
421                                 break;
422                         }
423                 }
424
425                 /* And emit the string */
426                 if (nq)
427                         appendStringInfoCharMacro(&buf, '"');
428                 for (tmp = value; *tmp; tmp++)
429                 {
430                         char            ch = *tmp;
431
432                         if (ch == '"' || ch == '\\')
433                                 appendStringInfoCharMacro(&buf, ch);
434                         appendStringInfoCharMacro(&buf, ch);
435                 }
436                 if (nq)
437                         appendStringInfoCharMacro(&buf, '"');
438         }
439
440         appendStringInfoChar(&buf, ')');
441
442         pfree(values);
443         pfree(nulls);
444         ReleaseTupleDesc(tupdesc);
445
446         PG_RETURN_CSTRING(buf.data);
447 }
448
449 /*
450  * record_recv          - binary input routine for any composite type.
451  */
452 Datum
453 record_recv(PG_FUNCTION_ARGS)
454 {
455         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
456         Oid                     tupType = PG_GETARG_OID(1);
457         int32           tupTypmod = PG_GETARG_INT32(2);
458         HeapTupleHeader result;
459         TupleDesc       tupdesc;
460         HeapTuple       tuple;
461         RecordIOData *my_extra;
462         int                     ncolumns;
463         int                     usercols;
464         int                     validcols;
465         int                     i;
466         Datum      *values;
467         bool       *nulls;
468
469         check_stack_depth();            /* recurses for record-type columns */
470
471         /*
472          * Give a friendly error message if we did not get enough info to identify
473          * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
474          * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
475          * for typmod, since composite types and RECORD have no type modifiers at
476          * the SQL level, and thus must fail for RECORD.  However some callers can
477          * supply a valid typmod, and then we can do something useful for RECORD.
478          */
479         if (tupType == RECORDOID && tupTypmod < 0)
480                 ereport(ERROR,
481                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
482                                  errmsg("input of anonymous composite types is not implemented")));
483
484         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
485         ncolumns = tupdesc->natts;
486
487         /*
488          * We arrange to look up the needed I/O info just once per series of
489          * calls, assuming the record type doesn't change underneath us.
490          */
491         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
492         if (my_extra == NULL ||
493                 my_extra->ncolumns != ncolumns)
494         {
495                 fcinfo->flinfo->fn_extra =
496                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
497                                                            offsetof(RecordIOData, columns) +
498                                                            ncolumns * sizeof(ColumnIOData));
499                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
500                 my_extra->record_type = InvalidOid;
501                 my_extra->record_typmod = 0;
502         }
503
504         if (my_extra->record_type != tupType ||
505                 my_extra->record_typmod != tupTypmod)
506         {
507                 MemSet(my_extra, 0,
508                            offsetof(RecordIOData, columns) +
509                            ncolumns * sizeof(ColumnIOData));
510                 my_extra->record_type = tupType;
511                 my_extra->record_typmod = tupTypmod;
512                 my_extra->ncolumns = ncolumns;
513         }
514
515         values = (Datum *) palloc(ncolumns * sizeof(Datum));
516         nulls = (bool *) palloc(ncolumns * sizeof(bool));
517
518         /* Fetch number of columns user thinks it has */
519         usercols = pq_getmsgint(buf, 4);
520
521         /* Need to scan to count nondeleted columns */
522         validcols = 0;
523         for (i = 0; i < ncolumns; i++)
524         {
525                 if (!TupleDescAttr(tupdesc, i)->attisdropped)
526                         validcols++;
527         }
528         if (usercols != validcols)
529                 ereport(ERROR,
530                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
531                                  errmsg("wrong number of columns: %d, expected %d",
532                                                 usercols, validcols)));
533
534         /* Process each column */
535         for (i = 0; i < ncolumns; i++)
536         {
537                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
538                 ColumnIOData *column_info = &my_extra->columns[i];
539                 Oid                     column_type = att->atttypid;
540                 Oid                     coltypoid;
541                 int                     itemlen;
542                 StringInfoData item_buf;
543                 StringInfo      bufptr;
544                 char            csave;
545
546                 /* Ignore dropped columns in datatype, but fill with nulls */
547                 if (att->attisdropped)
548                 {
549                         values[i] = (Datum) 0;
550                         nulls[i] = true;
551                         continue;
552                 }
553
554                 /* Verify column datatype */
555                 coltypoid = pq_getmsgint(buf, sizeof(Oid));
556                 if (coltypoid != column_type)
557                         ereport(ERROR,
558                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
559                                          errmsg("wrong data type: %u, expected %u",
560                                                         coltypoid, column_type)));
561
562                 /* Get and check the item length */
563                 itemlen = pq_getmsgint(buf, 4);
564                 if (itemlen < -1 || itemlen > (buf->len - buf->cursor))
565                         ereport(ERROR,
566                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
567                                          errmsg("insufficient data left in message")));
568
569                 if (itemlen == -1)
570                 {
571                         /* -1 length means NULL */
572                         bufptr = NULL;
573                         nulls[i] = true;
574                         csave = 0;                      /* keep compiler quiet */
575                 }
576                 else
577                 {
578                         /*
579                          * Rather than copying data around, we just set up a phony
580                          * StringInfo pointing to the correct portion of the input buffer.
581                          * We assume we can scribble on the input buffer so as to maintain
582                          * the convention that StringInfos have a trailing null.
583                          */
584                         item_buf.data = &buf->data[buf->cursor];
585                         item_buf.maxlen = itemlen + 1;
586                         item_buf.len = itemlen;
587                         item_buf.cursor = 0;
588
589                         buf->cursor += itemlen;
590
591                         csave = buf->data[buf->cursor];
592                         buf->data[buf->cursor] = '\0';
593
594                         bufptr = &item_buf;
595                         nulls[i] = false;
596                 }
597
598                 /* Now call the column's receiveproc */
599                 if (column_info->column_type != column_type)
600                 {
601                         getTypeBinaryInputInfo(column_type,
602                                                                    &column_info->typiofunc,
603                                                                    &column_info->typioparam);
604                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
605                                                   fcinfo->flinfo->fn_mcxt);
606                         column_info->column_type = column_type;
607                 }
608
609                 values[i] = ReceiveFunctionCall(&column_info->proc,
610                                                                                 bufptr,
611                                                                                 column_info->typioparam,
612                                                                                 att->atttypmod);
613
614                 if (bufptr)
615                 {
616                         /* Trouble if it didn't eat the whole buffer */
617                         if (item_buf.cursor != itemlen)
618                                 ereport(ERROR,
619                                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
620                                                  errmsg("improper binary format in record column %d",
621                                                                 i + 1)));
622
623                         buf->data[buf->cursor] = csave;
624                 }
625         }
626
627         tuple = heap_form_tuple(tupdesc, values, nulls);
628
629         /*
630          * We cannot return tuple->t_data because heap_form_tuple allocates it as
631          * part of a larger chunk, and our caller may expect to be able to pfree
632          * our result.  So must copy the info into a new palloc chunk.
633          */
634         result = (HeapTupleHeader) palloc(tuple->t_len);
635         memcpy(result, tuple->t_data, tuple->t_len);
636
637         heap_freetuple(tuple);
638         pfree(values);
639         pfree(nulls);
640         ReleaseTupleDesc(tupdesc);
641
642         PG_RETURN_HEAPTUPLEHEADER(result);
643 }
644
645 /*
646  * record_send          - binary output routine for any composite type.
647  */
648 Datum
649 record_send(PG_FUNCTION_ARGS)
650 {
651         HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
652         Oid                     tupType;
653         int32           tupTypmod;
654         TupleDesc       tupdesc;
655         HeapTupleData tuple;
656         RecordIOData *my_extra;
657         int                     ncolumns;
658         int                     validcols;
659         int                     i;
660         Datum      *values;
661         bool       *nulls;
662         StringInfoData buf;
663
664         check_stack_depth();            /* recurses for record-type columns */
665
666         /* Extract type info from the tuple itself */
667         tupType = HeapTupleHeaderGetTypeId(rec);
668         tupTypmod = HeapTupleHeaderGetTypMod(rec);
669         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
670         ncolumns = tupdesc->natts;
671
672         /* Build a temporary HeapTuple control structure */
673         tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
674         ItemPointerSetInvalid(&(tuple.t_self));
675         tuple.t_tableOid = InvalidOid;
676         tuple.t_data = rec;
677
678         /*
679          * We arrange to look up the needed I/O info just once per series of
680          * calls, assuming the record type doesn't change underneath us.
681          */
682         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
683         if (my_extra == NULL ||
684                 my_extra->ncolumns != ncolumns)
685         {
686                 fcinfo->flinfo->fn_extra =
687                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
688                                                            offsetof(RecordIOData, columns) +
689                                                            ncolumns * sizeof(ColumnIOData));
690                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
691                 my_extra->record_type = InvalidOid;
692                 my_extra->record_typmod = 0;
693         }
694
695         if (my_extra->record_type != tupType ||
696                 my_extra->record_typmod != tupTypmod)
697         {
698                 MemSet(my_extra, 0,
699                            offsetof(RecordIOData, columns) +
700                            ncolumns * sizeof(ColumnIOData));
701                 my_extra->record_type = tupType;
702                 my_extra->record_typmod = tupTypmod;
703                 my_extra->ncolumns = ncolumns;
704         }
705
706         values = (Datum *) palloc(ncolumns * sizeof(Datum));
707         nulls = (bool *) palloc(ncolumns * sizeof(bool));
708
709         /* Break down the tuple into fields */
710         heap_deform_tuple(&tuple, tupdesc, values, nulls);
711
712         /* And build the result string */
713         pq_begintypsend(&buf);
714
715         /* Need to scan to count nondeleted columns */
716         validcols = 0;
717         for (i = 0; i < ncolumns; i++)
718         {
719                 if (!TupleDescAttr(tupdesc, i)->attisdropped)
720                         validcols++;
721         }
722         pq_sendint32(&buf, validcols);
723
724         for (i = 0; i < ncolumns; i++)
725         {
726                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
727                 ColumnIOData *column_info = &my_extra->columns[i];
728                 Oid                     column_type = att->atttypid;
729                 Datum           attr;
730                 bytea      *outputbytes;
731
732                 /* Ignore dropped columns in datatype */
733                 if (att->attisdropped)
734                         continue;
735
736                 pq_sendint32(&buf, column_type);
737
738                 if (nulls[i])
739                 {
740                         /* emit -1 data length to signify a NULL */
741                         pq_sendint32(&buf, -1);
742                         continue;
743                 }
744
745                 /*
746                  * Convert the column value to binary
747                  */
748                 if (column_info->column_type != column_type)
749                 {
750                         getTypeBinaryOutputInfo(column_type,
751                                                                         &column_info->typiofunc,
752                                                                         &column_info->typisvarlena);
753                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
754                                                   fcinfo->flinfo->fn_mcxt);
755                         column_info->column_type = column_type;
756                 }
757
758                 attr = values[i];
759                 outputbytes = SendFunctionCall(&column_info->proc, attr);
760                 pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
761                 pq_sendbytes(&buf, VARDATA(outputbytes),
762                                          VARSIZE(outputbytes) - VARHDRSZ);
763         }
764
765         pfree(values);
766         pfree(nulls);
767         ReleaseTupleDesc(tupdesc);
768
769         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
770 }
771
772
773 /*
774  * record_cmp()
775  * Internal comparison function for records.
776  *
777  * Returns -1, 0 or 1
778  *
779  * Do not assume that the two inputs are exactly the same record type;
780  * for instance we might be comparing an anonymous ROW() construct against a
781  * named composite type.  We will compare as long as they have the same number
782  * of non-dropped columns of the same types.
783  */
784 static int
785 record_cmp(FunctionCallInfo fcinfo)
786 {
787         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
788         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
789         int                     result = 0;
790         Oid                     tupType1;
791         Oid                     tupType2;
792         int32           tupTypmod1;
793         int32           tupTypmod2;
794         TupleDesc       tupdesc1;
795         TupleDesc       tupdesc2;
796         HeapTupleData tuple1;
797         HeapTupleData tuple2;
798         int                     ncolumns1;
799         int                     ncolumns2;
800         RecordCompareData *my_extra;
801         int                     ncols;
802         Datum      *values1;
803         Datum      *values2;
804         bool       *nulls1;
805         bool       *nulls2;
806         int                     i1;
807         int                     i2;
808         int                     j;
809
810         check_stack_depth();            /* recurses for record-type columns */
811
812         /* Extract type info from the tuples */
813         tupType1 = HeapTupleHeaderGetTypeId(record1);
814         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
815         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
816         ncolumns1 = tupdesc1->natts;
817         tupType2 = HeapTupleHeaderGetTypeId(record2);
818         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
819         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
820         ncolumns2 = tupdesc2->natts;
821
822         /* Build temporary HeapTuple control structures */
823         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
824         ItemPointerSetInvalid(&(tuple1.t_self));
825         tuple1.t_tableOid = InvalidOid;
826         tuple1.t_data = record1;
827         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
828         ItemPointerSetInvalid(&(tuple2.t_self));
829         tuple2.t_tableOid = InvalidOid;
830         tuple2.t_data = record2;
831
832         /*
833          * We arrange to look up the needed comparison info just once per series
834          * of calls, assuming the record types don't change underneath us.
835          */
836         ncols = Max(ncolumns1, ncolumns2);
837         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
838         if (my_extra == NULL ||
839                 my_extra->ncolumns < ncols)
840         {
841                 fcinfo->flinfo->fn_extra =
842                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
843                                                            offsetof(RecordCompareData, columns) +
844                                                            ncols * sizeof(ColumnCompareData));
845                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
846                 my_extra->ncolumns = ncols;
847                 my_extra->record1_type = InvalidOid;
848                 my_extra->record1_typmod = 0;
849                 my_extra->record2_type = InvalidOid;
850                 my_extra->record2_typmod = 0;
851         }
852
853         if (my_extra->record1_type != tupType1 ||
854                 my_extra->record1_typmod != tupTypmod1 ||
855                 my_extra->record2_type != tupType2 ||
856                 my_extra->record2_typmod != tupTypmod2)
857         {
858                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
859                 my_extra->record1_type = tupType1;
860                 my_extra->record1_typmod = tupTypmod1;
861                 my_extra->record2_type = tupType2;
862                 my_extra->record2_typmod = tupTypmod2;
863         }
864
865         /* Break down the tuples into fields */
866         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
867         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
868         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
869         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
870         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
871         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
872
873         /*
874          * Scan corresponding columns, allowing for dropped columns in different
875          * places in the two rows.  i1 and i2 are physical column indexes, j is
876          * the logical column index.
877          */
878         i1 = i2 = j = 0;
879         while (i1 < ncolumns1 || i2 < ncolumns2)
880         {
881                 Form_pg_attribute att1;
882                 Form_pg_attribute att2;
883                 TypeCacheEntry *typentry;
884                 Oid                     collation;
885
886                 /*
887                  * Skip dropped columns
888                  */
889                 if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
890                 {
891                         i1++;
892                         continue;
893                 }
894                 if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
895                 {
896                         i2++;
897                         continue;
898                 }
899                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
900                         break;                          /* we'll deal with mismatch below loop */
901
902                 att1 = TupleDescAttr(tupdesc1, i1);
903                 att2 = TupleDescAttr(tupdesc2, i2);
904
905                 /*
906                  * Have two matching columns, they must be same type
907                  */
908                 if (att1->atttypid != att2->atttypid)
909                         ereport(ERROR,
910                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
911                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
912                                                         format_type_be(att1->atttypid),
913                                                         format_type_be(att2->atttypid),
914                                                         j + 1)));
915
916                 /*
917                  * If they're not same collation, we don't complain here, but the
918                  * comparison function might.
919                  */
920                 collation = att1->attcollation;
921                 if (collation != att2->attcollation)
922                         collation = InvalidOid;
923
924                 /*
925                  * Lookup the comparison function if not done already
926                  */
927                 typentry = my_extra->columns[j].typentry;
928                 if (typentry == NULL ||
929                         typentry->type_id != att1->atttypid)
930                 {
931                         typentry = lookup_type_cache(att1->atttypid,
932                                                                                  TYPECACHE_CMP_PROC_FINFO);
933                         if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
934                                 ereport(ERROR,
935                                                 (errcode(ERRCODE_UNDEFINED_FUNCTION),
936                                                  errmsg("could not identify a comparison function for type %s",
937                                                                 format_type_be(typentry->type_id))));
938                         my_extra->columns[j].typentry = typentry;
939                 }
940
941                 /*
942                  * We consider two NULLs equal; NULL > not-NULL.
943                  */
944                 if (!nulls1[i1] || !nulls2[i2])
945                 {
946                         LOCAL_FCINFO(locfcinfo, 2);
947                         int32           cmpresult;
948
949                         if (nulls1[i1])
950                         {
951                                 /* arg1 is greater than arg2 */
952                                 result = 1;
953                                 break;
954                         }
955                         if (nulls2[i2])
956                         {
957                                 /* arg1 is less than arg2 */
958                                 result = -1;
959                                 break;
960                         }
961
962                         /* Compare the pair of elements */
963                         InitFunctionCallInfoData(*locfcinfo, &typentry->cmp_proc_finfo, 2,
964                                                                          collation, NULL, NULL);
965                         locfcinfo->args[0].value = values1[i1];
966                         locfcinfo->args[0].isnull = false;
967                         locfcinfo->args[1].value = values2[i2];
968                         locfcinfo->args[1].isnull = false;
969                         locfcinfo->isnull = false;
970                         cmpresult = DatumGetInt32(FunctionCallInvoke(locfcinfo));
971
972                         if (cmpresult < 0)
973                         {
974                                 /* arg1 is less than arg2 */
975                                 result = -1;
976                                 break;
977                         }
978                         else if (cmpresult > 0)
979                         {
980                                 /* arg1 is greater than arg2 */
981                                 result = 1;
982                                 break;
983                         }
984                 }
985
986                 /* equal, so continue to next column */
987                 i1++, i2++, j++;
988         }
989
990         /*
991          * If we didn't break out of the loop early, check for column count
992          * mismatch.  (We do not report such mismatch if we found unequal column
993          * values; is that a feature or a bug?)
994          */
995         if (result == 0)
996         {
997                 if (i1 != ncolumns1 || i2 != ncolumns2)
998                         ereport(ERROR,
999                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1000                                          errmsg("cannot compare record types with different numbers of columns")));
1001         }
1002
1003         pfree(values1);
1004         pfree(nulls1);
1005         pfree(values2);
1006         pfree(nulls2);
1007         ReleaseTupleDesc(tupdesc1);
1008         ReleaseTupleDesc(tupdesc2);
1009
1010         /* Avoid leaking memory when handed toasted input. */
1011         PG_FREE_IF_COPY(record1, 0);
1012         PG_FREE_IF_COPY(record2, 1);
1013
1014         return result;
1015 }
1016
1017 /*
1018  * record_eq :
1019  *                compares two records for equality
1020  * result :
1021  *                returns true if the records are equal, false otherwise.
1022  *
1023  * Note: we do not use record_cmp here, since equality may be meaningful in
1024  * datatypes that don't have a total ordering (and hence no btree support).
1025  */
1026 Datum
1027 record_eq(PG_FUNCTION_ARGS)
1028 {
1029         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1030         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1031         bool            result = true;
1032         Oid                     tupType1;
1033         Oid                     tupType2;
1034         int32           tupTypmod1;
1035         int32           tupTypmod2;
1036         TupleDesc       tupdesc1;
1037         TupleDesc       tupdesc2;
1038         HeapTupleData tuple1;
1039         HeapTupleData tuple2;
1040         int                     ncolumns1;
1041         int                     ncolumns2;
1042         RecordCompareData *my_extra;
1043         int                     ncols;
1044         Datum      *values1;
1045         Datum      *values2;
1046         bool       *nulls1;
1047         bool       *nulls2;
1048         int                     i1;
1049         int                     i2;
1050         int                     j;
1051
1052         check_stack_depth();            /* recurses for record-type columns */
1053
1054         /* Extract type info from the tuples */
1055         tupType1 = HeapTupleHeaderGetTypeId(record1);
1056         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1057         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1058         ncolumns1 = tupdesc1->natts;
1059         tupType2 = HeapTupleHeaderGetTypeId(record2);
1060         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1061         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1062         ncolumns2 = tupdesc2->natts;
1063
1064         /* Build temporary HeapTuple control structures */
1065         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1066         ItemPointerSetInvalid(&(tuple1.t_self));
1067         tuple1.t_tableOid = InvalidOid;
1068         tuple1.t_data = record1;
1069         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1070         ItemPointerSetInvalid(&(tuple2.t_self));
1071         tuple2.t_tableOid = InvalidOid;
1072         tuple2.t_data = record2;
1073
1074         /*
1075          * We arrange to look up the needed comparison info just once per series
1076          * of calls, assuming the record types don't change underneath us.
1077          */
1078         ncols = Max(ncolumns1, ncolumns2);
1079         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1080         if (my_extra == NULL ||
1081                 my_extra->ncolumns < ncols)
1082         {
1083                 fcinfo->flinfo->fn_extra =
1084                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1085                                                            offsetof(RecordCompareData, columns) +
1086                                                            ncols * sizeof(ColumnCompareData));
1087                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1088                 my_extra->ncolumns = ncols;
1089                 my_extra->record1_type = InvalidOid;
1090                 my_extra->record1_typmod = 0;
1091                 my_extra->record2_type = InvalidOid;
1092                 my_extra->record2_typmod = 0;
1093         }
1094
1095         if (my_extra->record1_type != tupType1 ||
1096                 my_extra->record1_typmod != tupTypmod1 ||
1097                 my_extra->record2_type != tupType2 ||
1098                 my_extra->record2_typmod != tupTypmod2)
1099         {
1100                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1101                 my_extra->record1_type = tupType1;
1102                 my_extra->record1_typmod = tupTypmod1;
1103                 my_extra->record2_type = tupType2;
1104                 my_extra->record2_typmod = tupTypmod2;
1105         }
1106
1107         /* Break down the tuples into fields */
1108         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1109         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1110         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1111         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1112         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1113         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1114
1115         /*
1116          * Scan corresponding columns, allowing for dropped columns in different
1117          * places in the two rows.  i1 and i2 are physical column indexes, j is
1118          * the logical column index.
1119          */
1120         i1 = i2 = j = 0;
1121         while (i1 < ncolumns1 || i2 < ncolumns2)
1122         {
1123                 LOCAL_FCINFO(locfcinfo, 2);
1124                 Form_pg_attribute att1;
1125                 Form_pg_attribute att2;
1126                 TypeCacheEntry *typentry;
1127                 Oid                     collation;
1128                 bool            oprresult;
1129
1130                 /*
1131                  * Skip dropped columns
1132                  */
1133                 if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1134                 {
1135                         i1++;
1136                         continue;
1137                 }
1138                 if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1139                 {
1140                         i2++;
1141                         continue;
1142                 }
1143                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1144                         break;                          /* we'll deal with mismatch below loop */
1145
1146                 att1 = TupleDescAttr(tupdesc1, i1);
1147                 att2 = TupleDescAttr(tupdesc2, i2);
1148
1149                 /*
1150                  * Have two matching columns, they must be same type
1151                  */
1152                 if (att1->atttypid != att2->atttypid)
1153                         ereport(ERROR,
1154                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1155                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1156                                                         format_type_be(att1->atttypid),
1157                                                         format_type_be(att2->atttypid),
1158                                                         j + 1)));
1159
1160                 /*
1161                  * If they're not same collation, we don't complain here, but the
1162                  * equality function might.
1163                  */
1164                 collation = att1->attcollation;
1165                 if (collation != att2->attcollation)
1166                         collation = InvalidOid;
1167
1168                 /*
1169                  * Lookup the equality function if not done already
1170                  */
1171                 typentry = my_extra->columns[j].typentry;
1172                 if (typentry == NULL ||
1173                         typentry->type_id != att1->atttypid)
1174                 {
1175                         typentry = lookup_type_cache(att1->atttypid,
1176                                                                                  TYPECACHE_EQ_OPR_FINFO);
1177                         if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
1178                                 ereport(ERROR,
1179                                                 (errcode(ERRCODE_UNDEFINED_FUNCTION),
1180                                                  errmsg("could not identify an equality operator for type %s",
1181                                                                 format_type_be(typentry->type_id))));
1182                         my_extra->columns[j].typentry = typentry;
1183                 }
1184
1185                 /*
1186                  * We consider two NULLs equal; NULL > not-NULL.
1187                  */
1188                 if (!nulls1[i1] || !nulls2[i2])
1189                 {
1190                         if (nulls1[i1] || nulls2[i2])
1191                         {
1192                                 result = false;
1193                                 break;
1194                         }
1195
1196                         /* Compare the pair of elements */
1197                         InitFunctionCallInfoData(*locfcinfo, &typentry->eq_opr_finfo, 2,
1198                                                                          collation, NULL, NULL);
1199                         locfcinfo->args[0].value = values1[i1];
1200                         locfcinfo->args[0].isnull = false;
1201                         locfcinfo->args[1].value = values2[i2];
1202                         locfcinfo->args[1].isnull = false;
1203                         locfcinfo->isnull = false;
1204                         oprresult = DatumGetBool(FunctionCallInvoke(locfcinfo));
1205                         if (!oprresult)
1206                         {
1207                                 result = false;
1208                                 break;
1209                         }
1210                 }
1211
1212                 /* equal, so continue to next column */
1213                 i1++, i2++, j++;
1214         }
1215
1216         /*
1217          * If we didn't break out of the loop early, check for column count
1218          * mismatch.  (We do not report such mismatch if we found unequal column
1219          * values; is that a feature or a bug?)
1220          */
1221         if (result)
1222         {
1223                 if (i1 != ncolumns1 || i2 != ncolumns2)
1224                         ereport(ERROR,
1225                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1226                                          errmsg("cannot compare record types with different numbers of columns")));
1227         }
1228
1229         pfree(values1);
1230         pfree(nulls1);
1231         pfree(values2);
1232         pfree(nulls2);
1233         ReleaseTupleDesc(tupdesc1);
1234         ReleaseTupleDesc(tupdesc2);
1235
1236         /* Avoid leaking memory when handed toasted input. */
1237         PG_FREE_IF_COPY(record1, 0);
1238         PG_FREE_IF_COPY(record2, 1);
1239
1240         PG_RETURN_BOOL(result);
1241 }
1242
1243 Datum
1244 record_ne(PG_FUNCTION_ARGS)
1245 {
1246         PG_RETURN_BOOL(!DatumGetBool(record_eq(fcinfo)));
1247 }
1248
1249 Datum
1250 record_lt(PG_FUNCTION_ARGS)
1251 {
1252         PG_RETURN_BOOL(record_cmp(fcinfo) < 0);
1253 }
1254
1255 Datum
1256 record_gt(PG_FUNCTION_ARGS)
1257 {
1258         PG_RETURN_BOOL(record_cmp(fcinfo) > 0);
1259 }
1260
1261 Datum
1262 record_le(PG_FUNCTION_ARGS)
1263 {
1264         PG_RETURN_BOOL(record_cmp(fcinfo) <= 0);
1265 }
1266
1267 Datum
1268 record_ge(PG_FUNCTION_ARGS)
1269 {
1270         PG_RETURN_BOOL(record_cmp(fcinfo) >= 0);
1271 }
1272
1273 Datum
1274 btrecordcmp(PG_FUNCTION_ARGS)
1275 {
1276         PG_RETURN_INT32(record_cmp(fcinfo));
1277 }
1278
1279
1280 /*
1281  * record_image_cmp :
1282  * Internal byte-oriented comparison function for records.
1283  *
1284  * Returns -1, 0 or 1
1285  *
1286  * Note: The normal concepts of "equality" do not apply here; different
1287  * representation of values considered to be equal are not considered to be
1288  * identical.  As an example, for the citext type 'A' and 'a' are equal, but
1289  * they are not identical.
1290  */
1291 static int
1292 record_image_cmp(FunctionCallInfo fcinfo)
1293 {
1294         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1295         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1296         int                     result = 0;
1297         Oid                     tupType1;
1298         Oid                     tupType2;
1299         int32           tupTypmod1;
1300         int32           tupTypmod2;
1301         TupleDesc       tupdesc1;
1302         TupleDesc       tupdesc2;
1303         HeapTupleData tuple1;
1304         HeapTupleData tuple2;
1305         int                     ncolumns1;
1306         int                     ncolumns2;
1307         RecordCompareData *my_extra;
1308         int                     ncols;
1309         Datum      *values1;
1310         Datum      *values2;
1311         bool       *nulls1;
1312         bool       *nulls2;
1313         int                     i1;
1314         int                     i2;
1315         int                     j;
1316
1317         /* Extract type info from the tuples */
1318         tupType1 = HeapTupleHeaderGetTypeId(record1);
1319         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1320         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1321         ncolumns1 = tupdesc1->natts;
1322         tupType2 = HeapTupleHeaderGetTypeId(record2);
1323         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1324         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1325         ncolumns2 = tupdesc2->natts;
1326
1327         /* Build temporary HeapTuple control structures */
1328         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1329         ItemPointerSetInvalid(&(tuple1.t_self));
1330         tuple1.t_tableOid = InvalidOid;
1331         tuple1.t_data = record1;
1332         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1333         ItemPointerSetInvalid(&(tuple2.t_self));
1334         tuple2.t_tableOid = InvalidOid;
1335         tuple2.t_data = record2;
1336
1337         /*
1338          * We arrange to look up the needed comparison info just once per series
1339          * of calls, assuming the record types don't change underneath us.
1340          */
1341         ncols = Max(ncolumns1, ncolumns2);
1342         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1343         if (my_extra == NULL ||
1344                 my_extra->ncolumns < ncols)
1345         {
1346                 fcinfo->flinfo->fn_extra =
1347                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1348                                                            offsetof(RecordCompareData, columns) +
1349                                                            ncols * sizeof(ColumnCompareData));
1350                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1351                 my_extra->ncolumns = ncols;
1352                 my_extra->record1_type = InvalidOid;
1353                 my_extra->record1_typmod = 0;
1354                 my_extra->record2_type = InvalidOid;
1355                 my_extra->record2_typmod = 0;
1356         }
1357
1358         if (my_extra->record1_type != tupType1 ||
1359                 my_extra->record1_typmod != tupTypmod1 ||
1360                 my_extra->record2_type != tupType2 ||
1361                 my_extra->record2_typmod != tupTypmod2)
1362         {
1363                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1364                 my_extra->record1_type = tupType1;
1365                 my_extra->record1_typmod = tupTypmod1;
1366                 my_extra->record2_type = tupType2;
1367                 my_extra->record2_typmod = tupTypmod2;
1368         }
1369
1370         /* Break down the tuples into fields */
1371         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1372         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1373         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1374         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1375         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1376         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1377
1378         /*
1379          * Scan corresponding columns, allowing for dropped columns in different
1380          * places in the two rows.  i1 and i2 are physical column indexes, j is
1381          * the logical column index.
1382          */
1383         i1 = i2 = j = 0;
1384         while (i1 < ncolumns1 || i2 < ncolumns2)
1385         {
1386                 Form_pg_attribute att1;
1387                 Form_pg_attribute att2;
1388
1389                 /*
1390                  * Skip dropped columns
1391                  */
1392                 if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1393                 {
1394                         i1++;
1395                         continue;
1396                 }
1397                 if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1398                 {
1399                         i2++;
1400                         continue;
1401                 }
1402                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1403                         break;                          /* we'll deal with mismatch below loop */
1404
1405                 att1 = TupleDescAttr(tupdesc1, i1);
1406                 att2 = TupleDescAttr(tupdesc2, i2);
1407
1408                 /*
1409                  * Have two matching columns, they must be same type
1410                  */
1411                 if (att1->atttypid != att2->atttypid)
1412                         ereport(ERROR,
1413                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1414                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1415                                                         format_type_be(att1->atttypid),
1416                                                         format_type_be(att2->atttypid),
1417                                                         j + 1)));
1418
1419                 /*
1420                  * The same type should have the same length (or both should be
1421                  * variable).
1422                  */
1423                 Assert(att1->attlen == att2->attlen);
1424
1425                 /*
1426                  * We consider two NULLs equal; NULL > not-NULL.
1427                  */
1428                 if (!nulls1[i1] || !nulls2[i2])
1429                 {
1430                         int                     cmpresult = 0;
1431
1432                         if (nulls1[i1])
1433                         {
1434                                 /* arg1 is greater than arg2 */
1435                                 result = 1;
1436                                 break;
1437                         }
1438                         if (nulls2[i2])
1439                         {
1440                                 /* arg1 is less than arg2 */
1441                                 result = -1;
1442                                 break;
1443                         }
1444
1445                         /* Compare the pair of elements */
1446                         if (att1->attlen == -1)
1447                         {
1448                                 Size            len1,
1449                                                         len2;
1450                                 struct varlena *arg1val;
1451                                 struct varlena *arg2val;
1452
1453                                 len1 = toast_raw_datum_size(values1[i1]);
1454                                 len2 = toast_raw_datum_size(values2[i2]);
1455                                 arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1456                                 arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1457
1458                                 cmpresult = memcmp(VARDATA_ANY(arg1val),
1459                                                                    VARDATA_ANY(arg2val),
1460                                                                    Min(len1, len2) - VARHDRSZ);
1461                                 if ((cmpresult == 0) && (len1 != len2))
1462                                         cmpresult = (len1 < len2) ? -1 : 1;
1463
1464                                 if ((Pointer) arg1val != (Pointer) values1[i1])
1465                                         pfree(arg1val);
1466                                 if ((Pointer) arg2val != (Pointer) values2[i2])
1467                                         pfree(arg2val);
1468                         }
1469                         else if (att1->attbyval)
1470                         {
1471                                 if (values1[i1] != values2[i2])
1472                                         cmpresult = (values1[i1] < values2[i2]) ? -1 : 1;
1473                         }
1474                         else
1475                         {
1476                                 cmpresult = memcmp(DatumGetPointer(values1[i1]),
1477                                                                    DatumGetPointer(values2[i2]),
1478                                                                    att1->attlen);
1479                         }
1480
1481                         if (cmpresult < 0)
1482                         {
1483                                 /* arg1 is less than arg2 */
1484                                 result = -1;
1485                                 break;
1486                         }
1487                         else if (cmpresult > 0)
1488                         {
1489                                 /* arg1 is greater than arg2 */
1490                                 result = 1;
1491                                 break;
1492                         }
1493                 }
1494
1495                 /* equal, so continue to next column */
1496                 i1++, i2++, j++;
1497         }
1498
1499         /*
1500          * If we didn't break out of the loop early, check for column count
1501          * mismatch.  (We do not report such mismatch if we found unequal column
1502          * values; is that a feature or a bug?)
1503          */
1504         if (result == 0)
1505         {
1506                 if (i1 != ncolumns1 || i2 != ncolumns2)
1507                         ereport(ERROR,
1508                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1509                                          errmsg("cannot compare record types with different numbers of columns")));
1510         }
1511
1512         pfree(values1);
1513         pfree(nulls1);
1514         pfree(values2);
1515         pfree(nulls2);
1516         ReleaseTupleDesc(tupdesc1);
1517         ReleaseTupleDesc(tupdesc2);
1518
1519         /* Avoid leaking memory when handed toasted input. */
1520         PG_FREE_IF_COPY(record1, 0);
1521         PG_FREE_IF_COPY(record2, 1);
1522
1523         return result;
1524 }
1525
1526 /*
1527  * record_image_eq :
1528  *                compares two records for identical contents, based on byte images
1529  * result :
1530  *                returns true if the records are identical, false otherwise.
1531  *
1532  * Note: we do not use record_image_cmp here, since we can avoid
1533  * de-toasting for unequal lengths this way.
1534  */
1535 Datum
1536 record_image_eq(PG_FUNCTION_ARGS)
1537 {
1538         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1539         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1540         bool            result = true;
1541         Oid                     tupType1;
1542         Oid                     tupType2;
1543         int32           tupTypmod1;
1544         int32           tupTypmod2;
1545         TupleDesc       tupdesc1;
1546         TupleDesc       tupdesc2;
1547         HeapTupleData tuple1;
1548         HeapTupleData tuple2;
1549         int                     ncolumns1;
1550         int                     ncolumns2;
1551         RecordCompareData *my_extra;
1552         int                     ncols;
1553         Datum      *values1;
1554         Datum      *values2;
1555         bool       *nulls1;
1556         bool       *nulls2;
1557         int                     i1;
1558         int                     i2;
1559         int                     j;
1560
1561         /* Extract type info from the tuples */
1562         tupType1 = HeapTupleHeaderGetTypeId(record1);
1563         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1564         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1565         ncolumns1 = tupdesc1->natts;
1566         tupType2 = HeapTupleHeaderGetTypeId(record2);
1567         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1568         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1569         ncolumns2 = tupdesc2->natts;
1570
1571         /* Build temporary HeapTuple control structures */
1572         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1573         ItemPointerSetInvalid(&(tuple1.t_self));
1574         tuple1.t_tableOid = InvalidOid;
1575         tuple1.t_data = record1;
1576         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1577         ItemPointerSetInvalid(&(tuple2.t_self));
1578         tuple2.t_tableOid = InvalidOid;
1579         tuple2.t_data = record2;
1580
1581         /*
1582          * We arrange to look up the needed comparison info just once per series
1583          * of calls, assuming the record types don't change underneath us.
1584          */
1585         ncols = Max(ncolumns1, ncolumns2);
1586         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1587         if (my_extra == NULL ||
1588                 my_extra->ncolumns < ncols)
1589         {
1590                 fcinfo->flinfo->fn_extra =
1591                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1592                                                            offsetof(RecordCompareData, columns) +
1593                                                            ncols * sizeof(ColumnCompareData));
1594                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1595                 my_extra->ncolumns = ncols;
1596                 my_extra->record1_type = InvalidOid;
1597                 my_extra->record1_typmod = 0;
1598                 my_extra->record2_type = InvalidOid;
1599                 my_extra->record2_typmod = 0;
1600         }
1601
1602         if (my_extra->record1_type != tupType1 ||
1603                 my_extra->record1_typmod != tupTypmod1 ||
1604                 my_extra->record2_type != tupType2 ||
1605                 my_extra->record2_typmod != tupTypmod2)
1606         {
1607                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1608                 my_extra->record1_type = tupType1;
1609                 my_extra->record1_typmod = tupTypmod1;
1610                 my_extra->record2_type = tupType2;
1611                 my_extra->record2_typmod = tupTypmod2;
1612         }
1613
1614         /* Break down the tuples into fields */
1615         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1616         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1617         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1618         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1619         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1620         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1621
1622         /*
1623          * Scan corresponding columns, allowing for dropped columns in different
1624          * places in the two rows.  i1 and i2 are physical column indexes, j is
1625          * the logical column index.
1626          */
1627         i1 = i2 = j = 0;
1628         while (i1 < ncolumns1 || i2 < ncolumns2)
1629         {
1630                 Form_pg_attribute att1;
1631                 Form_pg_attribute att2;
1632
1633                 /*
1634                  * Skip dropped columns
1635                  */
1636                 if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1637                 {
1638                         i1++;
1639                         continue;
1640                 }
1641                 if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1642                 {
1643                         i2++;
1644                         continue;
1645                 }
1646                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1647                         break;                          /* we'll deal with mismatch below loop */
1648
1649                 att1 = TupleDescAttr(tupdesc1, i1);
1650                 att2 = TupleDescAttr(tupdesc2, i2);
1651
1652                 /*
1653                  * Have two matching columns, they must be same type
1654                  */
1655                 if (att1->atttypid != att2->atttypid)
1656                         ereport(ERROR,
1657                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1658                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1659                                                         format_type_be(att1->atttypid),
1660                                                         format_type_be(att2->atttypid),
1661                                                         j + 1)));
1662
1663                 /*
1664                  * We consider two NULLs equal; NULL > not-NULL.
1665                  */
1666                 if (!nulls1[i1] || !nulls2[i2])
1667                 {
1668                         if (nulls1[i1] || nulls2[i2])
1669                         {
1670                                 result = false;
1671                                 break;
1672                         }
1673
1674                         /* Compare the pair of elements */
1675                         result = datum_image_eq(values1[i1], values2[i2], att1->attbyval, att2->attlen);
1676                         if (!result)
1677                                 break;
1678                 }
1679
1680                 /* equal, so continue to next column */
1681                 i1++, i2++, j++;
1682         }
1683
1684         /*
1685          * If we didn't break out of the loop early, check for column count
1686          * mismatch.  (We do not report such mismatch if we found unequal column
1687          * values; is that a feature or a bug?)
1688          */
1689         if (result)
1690         {
1691                 if (i1 != ncolumns1 || i2 != ncolumns2)
1692                         ereport(ERROR,
1693                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1694                                          errmsg("cannot compare record types with different numbers of columns")));
1695         }
1696
1697         pfree(values1);
1698         pfree(nulls1);
1699         pfree(values2);
1700         pfree(nulls2);
1701         ReleaseTupleDesc(tupdesc1);
1702         ReleaseTupleDesc(tupdesc2);
1703
1704         /* Avoid leaking memory when handed toasted input. */
1705         PG_FREE_IF_COPY(record1, 0);
1706         PG_FREE_IF_COPY(record2, 1);
1707
1708         PG_RETURN_BOOL(result);
1709 }
1710
1711 Datum
1712 record_image_ne(PG_FUNCTION_ARGS)
1713 {
1714         PG_RETURN_BOOL(!DatumGetBool(record_image_eq(fcinfo)));
1715 }
1716
1717 Datum
1718 record_image_lt(PG_FUNCTION_ARGS)
1719 {
1720         PG_RETURN_BOOL(record_image_cmp(fcinfo) < 0);
1721 }
1722
1723 Datum
1724 record_image_gt(PG_FUNCTION_ARGS)
1725 {
1726         PG_RETURN_BOOL(record_image_cmp(fcinfo) > 0);
1727 }
1728
1729 Datum
1730 record_image_le(PG_FUNCTION_ARGS)
1731 {
1732         PG_RETURN_BOOL(record_image_cmp(fcinfo) <= 0);
1733 }
1734
1735 Datum
1736 record_image_ge(PG_FUNCTION_ARGS)
1737 {
1738         PG_RETURN_BOOL(record_image_cmp(fcinfo) >= 0);
1739 }
1740
1741 Datum
1742 btrecordimagecmp(PG_FUNCTION_ARGS)
1743 {
1744         PG_RETURN_INT32(record_image_cmp(fcinfo));
1745 }