]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/rowtypes.c
Allow record_in() and record_recv() to work for transient record types.
[postgresql] / src / backend / utils / adt / rowtypes.c
1 /*-------------------------------------------------------------------------
2  *
3  * rowtypes.c
4  *        I/O and comparison functions for generic composite types.
5  *
6  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/utils/adt/rowtypes.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <ctype.h>
18
19 #include "access/htup_details.h"
20 #include "access/tuptoaster.h"
21 #include "catalog/pg_type.h"
22 #include "funcapi.h"
23 #include "libpq/pqformat.h"
24 #include "utils/builtins.h"
25 #include "utils/lsyscache.h"
26 #include "utils/typcache.h"
27
28
29 /*
30  * structure to cache metadata needed for record I/O
31  */
32 typedef struct ColumnIOData
33 {
34         Oid                     column_type;
35         Oid                     typiofunc;
36         Oid                     typioparam;
37         bool            typisvarlena;
38         FmgrInfo        proc;
39 } ColumnIOData;
40
41 typedef struct RecordIOData
42 {
43         Oid                     record_type;
44         int32           record_typmod;
45         int                     ncolumns;
46         ColumnIOData columns[FLEXIBLE_ARRAY_MEMBER];
47 } RecordIOData;
48
49 /*
50  * structure to cache metadata needed for record comparison
51  */
52 typedef struct ColumnCompareData
53 {
54         TypeCacheEntry *typentry;       /* has everything we need, actually */
55 } ColumnCompareData;
56
57 typedef struct RecordCompareData
58 {
59         int                     ncolumns;               /* allocated length of columns[] */
60         Oid                     record1_type;
61         int32           record1_typmod;
62         Oid                     record2_type;
63         int32           record2_typmod;
64         ColumnCompareData columns[FLEXIBLE_ARRAY_MEMBER];
65 } RecordCompareData;
66
67
68 /*
69  * record_in            - input routine for any composite type.
70  */
71 Datum
72 record_in(PG_FUNCTION_ARGS)
73 {
74         char       *string = PG_GETARG_CSTRING(0);
75         Oid                     tupType = PG_GETARG_OID(1);
76         int32           tupTypmod = PG_GETARG_INT32(2);
77         HeapTupleHeader result;
78         TupleDesc       tupdesc;
79         HeapTuple       tuple;
80         RecordIOData *my_extra;
81         bool            needComma = false;
82         int                     ncolumns;
83         int                     i;
84         char       *ptr;
85         Datum      *values;
86         bool       *nulls;
87         StringInfoData buf;
88
89         /*
90          * Give a friendly error message if we did not get enough info to identify
91          * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
92          * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
93          * for typmod, since composite types and RECORD have no type modifiers at
94          * the SQL level, and thus must fail for RECORD.  However some callers can
95          * supply a valid typmod, and then we can do something useful for RECORD.
96          */
97         if (tupType == RECORDOID && tupTypmod < 0)
98                 ereport(ERROR,
99                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
100                    errmsg("input of anonymous composite types is not implemented")));
101
102         /*
103          * This comes from the composite type's pg_type.oid and stores system oids
104          * in user tables, specifically DatumTupleFields. This oid must be
105          * preserved by binary upgrades.
106          */
107         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
108         ncolumns = tupdesc->natts;
109
110         /*
111          * We arrange to look up the needed I/O info just once per series of
112          * calls, assuming the record type doesn't change underneath us.
113          */
114         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
115         if (my_extra == NULL ||
116                 my_extra->ncolumns != ncolumns)
117         {
118                 fcinfo->flinfo->fn_extra =
119                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
120                                                            offsetof(RecordIOData, columns) +
121                                                            ncolumns * sizeof(ColumnIOData));
122                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
123                 my_extra->record_type = InvalidOid;
124                 my_extra->record_typmod = 0;
125         }
126
127         if (my_extra->record_type != tupType ||
128                 my_extra->record_typmod != tupTypmod)
129         {
130                 MemSet(my_extra, 0,
131                            offsetof(RecordIOData, columns) +
132                            ncolumns * sizeof(ColumnIOData));
133                 my_extra->record_type = tupType;
134                 my_extra->record_typmod = tupTypmod;
135                 my_extra->ncolumns = ncolumns;
136         }
137
138         values = (Datum *) palloc(ncolumns * sizeof(Datum));
139         nulls = (bool *) palloc(ncolumns * sizeof(bool));
140
141         /*
142          * Scan the string.  We use "buf" to accumulate the de-quoted data for
143          * each column, which is then fed to the appropriate input converter.
144          */
145         ptr = string;
146         /* Allow leading whitespace */
147         while (*ptr && isspace((unsigned char) *ptr))
148                 ptr++;
149         if (*ptr++ != '(')
150                 ereport(ERROR,
151                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
152                                  errmsg("malformed record literal: \"%s\"", string),
153                                  errdetail("Missing left parenthesis.")));
154
155         initStringInfo(&buf);
156
157         for (i = 0; i < ncolumns; i++)
158         {
159                 ColumnIOData *column_info = &my_extra->columns[i];
160                 Oid                     column_type = tupdesc->attrs[i]->atttypid;
161                 char       *column_data;
162
163                 /* Ignore dropped columns in datatype, but fill with nulls */
164                 if (tupdesc->attrs[i]->attisdropped)
165                 {
166                         values[i] = (Datum) 0;
167                         nulls[i] = true;
168                         continue;
169                 }
170
171                 if (needComma)
172                 {
173                         /* Skip comma that separates prior field from this one */
174                         if (*ptr == ',')
175                                 ptr++;
176                         else
177                                 /* *ptr must be ')' */
178                                 ereport(ERROR,
179                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
180                                                  errmsg("malformed record literal: \"%s\"", string),
181                                                  errdetail("Too few columns.")));
182                 }
183
184                 /* Check for null: completely empty input means null */
185                 if (*ptr == ',' || *ptr == ')')
186                 {
187                         column_data = NULL;
188                         nulls[i] = true;
189                 }
190                 else
191                 {
192                         /* Extract string for this column */
193                         bool            inquote = false;
194
195                         resetStringInfo(&buf);
196                         while (inquote || !(*ptr == ',' || *ptr == ')'))
197                         {
198                                 char            ch = *ptr++;
199
200                                 if (ch == '\0')
201                                         ereport(ERROR,
202                                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
203                                                          errmsg("malformed record literal: \"%s\"",
204                                                                         string),
205                                                          errdetail("Unexpected end of input.")));
206                                 if (ch == '\\')
207                                 {
208                                         if (*ptr == '\0')
209                                                 ereport(ERROR,
210                                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
211                                                                  errmsg("malformed record literal: \"%s\"",
212                                                                                 string),
213                                                                  errdetail("Unexpected end of input.")));
214                                         appendStringInfoChar(&buf, *ptr++);
215                                 }
216                                 else if (ch == '\"')
217                                 {
218                                         if (!inquote)
219                                                 inquote = true;
220                                         else if (*ptr == '\"')
221                                         {
222                                                 /* doubled quote within quote sequence */
223                                                 appendStringInfoChar(&buf, *ptr++);
224                                         }
225                                         else
226                                                 inquote = false;
227                                 }
228                                 else
229                                         appendStringInfoChar(&buf, ch);
230                         }
231
232                         column_data = buf.data;
233                         nulls[i] = false;
234                 }
235
236                 /*
237                  * Convert the column value
238                  */
239                 if (column_info->column_type != column_type)
240                 {
241                         getTypeInputInfo(column_type,
242                                                          &column_info->typiofunc,
243                                                          &column_info->typioparam);
244                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
245                                                   fcinfo->flinfo->fn_mcxt);
246                         column_info->column_type = column_type;
247                 }
248
249                 values[i] = InputFunctionCall(&column_info->proc,
250                                                                           column_data,
251                                                                           column_info->typioparam,
252                                                                           tupdesc->attrs[i]->atttypmod);
253
254                 /*
255                  * Prep for next column
256                  */
257                 needComma = true;
258         }
259
260         if (*ptr++ != ')')
261                 ereport(ERROR,
262                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
263                                  errmsg("malformed record literal: \"%s\"", string),
264                                  errdetail("Too many columns.")));
265         /* Allow trailing whitespace */
266         while (*ptr && isspace((unsigned char) *ptr))
267                 ptr++;
268         if (*ptr)
269                 ereport(ERROR,
270                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
271                                  errmsg("malformed record literal: \"%s\"", string),
272                                  errdetail("Junk after right parenthesis.")));
273
274         tuple = heap_form_tuple(tupdesc, values, nulls);
275
276         /*
277          * We cannot return tuple->t_data because heap_form_tuple allocates it as
278          * part of a larger chunk, and our caller may expect to be able to pfree
279          * our result.  So must copy the info into a new palloc chunk.
280          */
281         result = (HeapTupleHeader) palloc(tuple->t_len);
282         memcpy(result, tuple->t_data, tuple->t_len);
283
284         heap_freetuple(tuple);
285         pfree(buf.data);
286         pfree(values);
287         pfree(nulls);
288         ReleaseTupleDesc(tupdesc);
289
290         PG_RETURN_HEAPTUPLEHEADER(result);
291 }
292
293 /*
294  * record_out           - output routine for any composite type.
295  */
296 Datum
297 record_out(PG_FUNCTION_ARGS)
298 {
299         HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
300         Oid                     tupType;
301         int32           tupTypmod;
302         TupleDesc       tupdesc;
303         HeapTupleData tuple;
304         RecordIOData *my_extra;
305         bool            needComma = false;
306         int                     ncolumns;
307         int                     i;
308         Datum      *values;
309         bool       *nulls;
310         StringInfoData buf;
311
312         /* Extract type info from the tuple itself */
313         tupType = HeapTupleHeaderGetTypeId(rec);
314         tupTypmod = HeapTupleHeaderGetTypMod(rec);
315         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
316         ncolumns = tupdesc->natts;
317
318         /* Build a temporary HeapTuple control structure */
319         tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
320         ItemPointerSetInvalid(&(tuple.t_self));
321         tuple.t_tableOid = InvalidOid;
322         tuple.t_data = rec;
323
324         /*
325          * We arrange to look up the needed I/O info just once per series of
326          * calls, assuming the record type doesn't change underneath us.
327          */
328         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
329         if (my_extra == NULL ||
330                 my_extra->ncolumns != ncolumns)
331         {
332                 fcinfo->flinfo->fn_extra =
333                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
334                                                            offsetof(RecordIOData, columns) +
335                                                            ncolumns * sizeof(ColumnIOData));
336                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
337                 my_extra->record_type = InvalidOid;
338                 my_extra->record_typmod = 0;
339         }
340
341         if (my_extra->record_type != tupType ||
342                 my_extra->record_typmod != tupTypmod)
343         {
344                 MemSet(my_extra, 0,
345                            offsetof(RecordIOData, columns) +
346                            ncolumns * sizeof(ColumnIOData));
347                 my_extra->record_type = tupType;
348                 my_extra->record_typmod = tupTypmod;
349                 my_extra->ncolumns = ncolumns;
350         }
351
352         values = (Datum *) palloc(ncolumns * sizeof(Datum));
353         nulls = (bool *) palloc(ncolumns * sizeof(bool));
354
355         /* Break down the tuple into fields */
356         heap_deform_tuple(&tuple, tupdesc, values, nulls);
357
358         /* And build the result string */
359         initStringInfo(&buf);
360
361         appendStringInfoChar(&buf, '(');
362
363         for (i = 0; i < ncolumns; i++)
364         {
365                 ColumnIOData *column_info = &my_extra->columns[i];
366                 Oid                     column_type = tupdesc->attrs[i]->atttypid;
367                 Datum           attr;
368                 char       *value;
369                 char       *tmp;
370                 bool            nq;
371
372                 /* Ignore dropped columns in datatype */
373                 if (tupdesc->attrs[i]->attisdropped)
374                         continue;
375
376                 if (needComma)
377                         appendStringInfoChar(&buf, ',');
378                 needComma = true;
379
380                 if (nulls[i])
381                 {
382                         /* emit nothing... */
383                         continue;
384                 }
385
386                 /*
387                  * Convert the column value to text
388                  */
389                 if (column_info->column_type != column_type)
390                 {
391                         getTypeOutputInfo(column_type,
392                                                           &column_info->typiofunc,
393                                                           &column_info->typisvarlena);
394                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
395                                                   fcinfo->flinfo->fn_mcxt);
396                         column_info->column_type = column_type;
397                 }
398
399                 attr = values[i];
400                 value = OutputFunctionCall(&column_info->proc, attr);
401
402                 /* Detect whether we need double quotes for this value */
403                 nq = (value[0] == '\0');        /* force quotes for empty string */
404                 for (tmp = value; *tmp; tmp++)
405                 {
406                         char            ch = *tmp;
407
408                         if (ch == '"' || ch == '\\' ||
409                                 ch == '(' || ch == ')' || ch == ',' ||
410                                 isspace((unsigned char) ch))
411                         {
412                                 nq = true;
413                                 break;
414                         }
415                 }
416
417                 /* And emit the string */
418                 if (nq)
419                         appendStringInfoCharMacro(&buf, '"');
420                 for (tmp = value; *tmp; tmp++)
421                 {
422                         char            ch = *tmp;
423
424                         if (ch == '"' || ch == '\\')
425                                 appendStringInfoCharMacro(&buf, ch);
426                         appendStringInfoCharMacro(&buf, ch);
427                 }
428                 if (nq)
429                         appendStringInfoCharMacro(&buf, '"');
430         }
431
432         appendStringInfoChar(&buf, ')');
433
434         pfree(values);
435         pfree(nulls);
436         ReleaseTupleDesc(tupdesc);
437
438         PG_RETURN_CSTRING(buf.data);
439 }
440
441 /*
442  * record_recv          - binary input routine for any composite type.
443  */
444 Datum
445 record_recv(PG_FUNCTION_ARGS)
446 {
447         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
448         Oid                     tupType = PG_GETARG_OID(1);
449         int32           tupTypmod = PG_GETARG_INT32(2);
450         HeapTupleHeader result;
451         TupleDesc       tupdesc;
452         HeapTuple       tuple;
453         RecordIOData *my_extra;
454         int                     ncolumns;
455         int                     usercols;
456         int                     validcols;
457         int                     i;
458         Datum      *values;
459         bool       *nulls;
460
461         /*
462          * Give a friendly error message if we did not get enough info to identify
463          * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
464          * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
465          * for typmod, since composite types and RECORD have no type modifiers at
466          * the SQL level, and thus must fail for RECORD.  However some callers can
467          * supply a valid typmod, and then we can do something useful for RECORD.
468          */
469         if (tupType == RECORDOID && tupTypmod < 0)
470                 ereport(ERROR,
471                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
472                    errmsg("input of anonymous composite types is not implemented")));
473
474         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
475         ncolumns = tupdesc->natts;
476
477         /*
478          * We arrange to look up the needed I/O info just once per series of
479          * calls, assuming the record type doesn't change underneath us.
480          */
481         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
482         if (my_extra == NULL ||
483                 my_extra->ncolumns != ncolumns)
484         {
485                 fcinfo->flinfo->fn_extra =
486                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
487                                                            offsetof(RecordIOData, columns) +
488                                                            ncolumns * sizeof(ColumnIOData));
489                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
490                 my_extra->record_type = InvalidOid;
491                 my_extra->record_typmod = 0;
492         }
493
494         if (my_extra->record_type != tupType ||
495                 my_extra->record_typmod != tupTypmod)
496         {
497                 MemSet(my_extra, 0,
498                            offsetof(RecordIOData, columns) +
499                            ncolumns * sizeof(ColumnIOData));
500                 my_extra->record_type = tupType;
501                 my_extra->record_typmod = tupTypmod;
502                 my_extra->ncolumns = ncolumns;
503         }
504
505         values = (Datum *) palloc(ncolumns * sizeof(Datum));
506         nulls = (bool *) palloc(ncolumns * sizeof(bool));
507
508         /* Fetch number of columns user thinks it has */
509         usercols = pq_getmsgint(buf, 4);
510
511         /* Need to scan to count nondeleted columns */
512         validcols = 0;
513         for (i = 0; i < ncolumns; i++)
514         {
515                 if (!tupdesc->attrs[i]->attisdropped)
516                         validcols++;
517         }
518         if (usercols != validcols)
519                 ereport(ERROR,
520                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
521                                  errmsg("wrong number of columns: %d, expected %d",
522                                                 usercols, validcols)));
523
524         /* Process each column */
525         for (i = 0; i < ncolumns; i++)
526         {
527                 ColumnIOData *column_info = &my_extra->columns[i];
528                 Oid                     column_type = tupdesc->attrs[i]->atttypid;
529                 Oid                     coltypoid;
530                 int                     itemlen;
531                 StringInfoData item_buf;
532                 StringInfo      bufptr;
533                 char            csave;
534
535                 /* Ignore dropped columns in datatype, but fill with nulls */
536                 if (tupdesc->attrs[i]->attisdropped)
537                 {
538                         values[i] = (Datum) 0;
539                         nulls[i] = true;
540                         continue;
541                 }
542
543                 /* Verify column datatype */
544                 coltypoid = pq_getmsgint(buf, sizeof(Oid));
545                 if (coltypoid != column_type)
546                         ereport(ERROR,
547                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
548                                          errmsg("wrong data type: %u, expected %u",
549                                                         coltypoid, column_type)));
550
551                 /* Get and check the item length */
552                 itemlen = pq_getmsgint(buf, 4);
553                 if (itemlen < -1 || itemlen > (buf->len - buf->cursor))
554                         ereport(ERROR,
555                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
556                                          errmsg("insufficient data left in message")));
557
558                 if (itemlen == -1)
559                 {
560                         /* -1 length means NULL */
561                         bufptr = NULL;
562                         nulls[i] = true;
563                         csave = 0;                      /* keep compiler quiet */
564                 }
565                 else
566                 {
567                         /*
568                          * Rather than copying data around, we just set up a phony
569                          * StringInfo pointing to the correct portion of the input buffer.
570                          * We assume we can scribble on the input buffer so as to maintain
571                          * the convention that StringInfos have a trailing null.
572                          */
573                         item_buf.data = &buf->data[buf->cursor];
574                         item_buf.maxlen = itemlen + 1;
575                         item_buf.len = itemlen;
576                         item_buf.cursor = 0;
577
578                         buf->cursor += itemlen;
579
580                         csave = buf->data[buf->cursor];
581                         buf->data[buf->cursor] = '\0';
582
583                         bufptr = &item_buf;
584                         nulls[i] = false;
585                 }
586
587                 /* Now call the column's receiveproc */
588                 if (column_info->column_type != column_type)
589                 {
590                         getTypeBinaryInputInfo(column_type,
591                                                                    &column_info->typiofunc,
592                                                                    &column_info->typioparam);
593                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
594                                                   fcinfo->flinfo->fn_mcxt);
595                         column_info->column_type = column_type;
596                 }
597
598                 values[i] = ReceiveFunctionCall(&column_info->proc,
599                                                                                 bufptr,
600                                                                                 column_info->typioparam,
601                                                                                 tupdesc->attrs[i]->atttypmod);
602
603                 if (bufptr)
604                 {
605                         /* Trouble if it didn't eat the whole buffer */
606                         if (item_buf.cursor != itemlen)
607                                 ereport(ERROR,
608                                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
609                                                  errmsg("improper binary format in record column %d",
610                                                                 i + 1)));
611
612                         buf->data[buf->cursor] = csave;
613                 }
614         }
615
616         tuple = heap_form_tuple(tupdesc, values, nulls);
617
618         /*
619          * We cannot return tuple->t_data because heap_form_tuple allocates it as
620          * part of a larger chunk, and our caller may expect to be able to pfree
621          * our result.  So must copy the info into a new palloc chunk.
622          */
623         result = (HeapTupleHeader) palloc(tuple->t_len);
624         memcpy(result, tuple->t_data, tuple->t_len);
625
626         heap_freetuple(tuple);
627         pfree(values);
628         pfree(nulls);
629         ReleaseTupleDesc(tupdesc);
630
631         PG_RETURN_HEAPTUPLEHEADER(result);
632 }
633
634 /*
635  * record_send          - binary output routine for any composite type.
636  */
637 Datum
638 record_send(PG_FUNCTION_ARGS)
639 {
640         HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
641         Oid                     tupType;
642         int32           tupTypmod;
643         TupleDesc       tupdesc;
644         HeapTupleData tuple;
645         RecordIOData *my_extra;
646         int                     ncolumns;
647         int                     validcols;
648         int                     i;
649         Datum      *values;
650         bool       *nulls;
651         StringInfoData buf;
652
653         /* Extract type info from the tuple itself */
654         tupType = HeapTupleHeaderGetTypeId(rec);
655         tupTypmod = HeapTupleHeaderGetTypMod(rec);
656         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
657         ncolumns = tupdesc->natts;
658
659         /* Build a temporary HeapTuple control structure */
660         tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
661         ItemPointerSetInvalid(&(tuple.t_self));
662         tuple.t_tableOid = InvalidOid;
663         tuple.t_data = rec;
664
665         /*
666          * We arrange to look up the needed I/O info just once per series of
667          * calls, assuming the record type doesn't change underneath us.
668          */
669         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
670         if (my_extra == NULL ||
671                 my_extra->ncolumns != ncolumns)
672         {
673                 fcinfo->flinfo->fn_extra =
674                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
675                                                            offsetof(RecordIOData, columns) +
676                                                            ncolumns * sizeof(ColumnIOData));
677                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
678                 my_extra->record_type = InvalidOid;
679                 my_extra->record_typmod = 0;
680         }
681
682         if (my_extra->record_type != tupType ||
683                 my_extra->record_typmod != tupTypmod)
684         {
685                 MemSet(my_extra, 0,
686                            offsetof(RecordIOData, columns) +
687                            ncolumns * sizeof(ColumnIOData));
688                 my_extra->record_type = tupType;
689                 my_extra->record_typmod = tupTypmod;
690                 my_extra->ncolumns = ncolumns;
691         }
692
693         values = (Datum *) palloc(ncolumns * sizeof(Datum));
694         nulls = (bool *) palloc(ncolumns * sizeof(bool));
695
696         /* Break down the tuple into fields */
697         heap_deform_tuple(&tuple, tupdesc, values, nulls);
698
699         /* And build the result string */
700         pq_begintypsend(&buf);
701
702         /* Need to scan to count nondeleted columns */
703         validcols = 0;
704         for (i = 0; i < ncolumns; i++)
705         {
706                 if (!tupdesc->attrs[i]->attisdropped)
707                         validcols++;
708         }
709         pq_sendint(&buf, validcols, 4);
710
711         for (i = 0; i < ncolumns; i++)
712         {
713                 ColumnIOData *column_info = &my_extra->columns[i];
714                 Oid                     column_type = tupdesc->attrs[i]->atttypid;
715                 Datum           attr;
716                 bytea      *outputbytes;
717
718                 /* Ignore dropped columns in datatype */
719                 if (tupdesc->attrs[i]->attisdropped)
720                         continue;
721
722                 pq_sendint(&buf, column_type, sizeof(Oid));
723
724                 if (nulls[i])
725                 {
726                         /* emit -1 data length to signify a NULL */
727                         pq_sendint(&buf, -1, 4);
728                         continue;
729                 }
730
731                 /*
732                  * Convert the column value to binary
733                  */
734                 if (column_info->column_type != column_type)
735                 {
736                         getTypeBinaryOutputInfo(column_type,
737                                                                         &column_info->typiofunc,
738                                                                         &column_info->typisvarlena);
739                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
740                                                   fcinfo->flinfo->fn_mcxt);
741                         column_info->column_type = column_type;
742                 }
743
744                 attr = values[i];
745                 outputbytes = SendFunctionCall(&column_info->proc, attr);
746                 pq_sendint(&buf, VARSIZE(outputbytes) - VARHDRSZ, 4);
747                 pq_sendbytes(&buf, VARDATA(outputbytes),
748                                          VARSIZE(outputbytes) - VARHDRSZ);
749         }
750
751         pfree(values);
752         pfree(nulls);
753         ReleaseTupleDesc(tupdesc);
754
755         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
756 }
757
758
759 /*
760  * record_cmp()
761  * Internal comparison function for records.
762  *
763  * Returns -1, 0 or 1
764  *
765  * Do not assume that the two inputs are exactly the same record type;
766  * for instance we might be comparing an anonymous ROW() construct against a
767  * named composite type.  We will compare as long as they have the same number
768  * of non-dropped columns of the same types.
769  */
770 static int
771 record_cmp(FunctionCallInfo fcinfo)
772 {
773         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
774         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
775         int                     result = 0;
776         Oid                     tupType1;
777         Oid                     tupType2;
778         int32           tupTypmod1;
779         int32           tupTypmod2;
780         TupleDesc       tupdesc1;
781         TupleDesc       tupdesc2;
782         HeapTupleData tuple1;
783         HeapTupleData tuple2;
784         int                     ncolumns1;
785         int                     ncolumns2;
786         RecordCompareData *my_extra;
787         int                     ncols;
788         Datum      *values1;
789         Datum      *values2;
790         bool       *nulls1;
791         bool       *nulls2;
792         int                     i1;
793         int                     i2;
794         int                     j;
795
796         /* Extract type info from the tuples */
797         tupType1 = HeapTupleHeaderGetTypeId(record1);
798         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
799         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
800         ncolumns1 = tupdesc1->natts;
801         tupType2 = HeapTupleHeaderGetTypeId(record2);
802         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
803         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
804         ncolumns2 = tupdesc2->natts;
805
806         /* Build temporary HeapTuple control structures */
807         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
808         ItemPointerSetInvalid(&(tuple1.t_self));
809         tuple1.t_tableOid = InvalidOid;
810         tuple1.t_data = record1;
811         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
812         ItemPointerSetInvalid(&(tuple2.t_self));
813         tuple2.t_tableOid = InvalidOid;
814         tuple2.t_data = record2;
815
816         /*
817          * We arrange to look up the needed comparison info just once per series
818          * of calls, assuming the record types don't change underneath us.
819          */
820         ncols = Max(ncolumns1, ncolumns2);
821         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
822         if (my_extra == NULL ||
823                 my_extra->ncolumns < ncols)
824         {
825                 fcinfo->flinfo->fn_extra =
826                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
827                                                            offsetof(RecordCompareData, columns) +
828                                                            ncols * sizeof(ColumnCompareData));
829                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
830                 my_extra->ncolumns = ncols;
831                 my_extra->record1_type = InvalidOid;
832                 my_extra->record1_typmod = 0;
833                 my_extra->record2_type = InvalidOid;
834                 my_extra->record2_typmod = 0;
835         }
836
837         if (my_extra->record1_type != tupType1 ||
838                 my_extra->record1_typmod != tupTypmod1 ||
839                 my_extra->record2_type != tupType2 ||
840                 my_extra->record2_typmod != tupTypmod2)
841         {
842                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
843                 my_extra->record1_type = tupType1;
844                 my_extra->record1_typmod = tupTypmod1;
845                 my_extra->record2_type = tupType2;
846                 my_extra->record2_typmod = tupTypmod2;
847         }
848
849         /* Break down the tuples into fields */
850         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
851         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
852         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
853         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
854         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
855         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
856
857         /*
858          * Scan corresponding columns, allowing for dropped columns in different
859          * places in the two rows.  i1 and i2 are physical column indexes, j is
860          * the logical column index.
861          */
862         i1 = i2 = j = 0;
863         while (i1 < ncolumns1 || i2 < ncolumns2)
864         {
865                 TypeCacheEntry *typentry;
866                 Oid                     collation;
867
868                 /*
869                  * Skip dropped columns
870                  */
871                 if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped)
872                 {
873                         i1++;
874                         continue;
875                 }
876                 if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped)
877                 {
878                         i2++;
879                         continue;
880                 }
881                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
882                         break;                          /* we'll deal with mismatch below loop */
883
884                 /*
885                  * Have two matching columns, they must be same type
886                  */
887                 if (tupdesc1->attrs[i1]->atttypid !=
888                         tupdesc2->attrs[i2]->atttypid)
889                         ereport(ERROR,
890                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
891                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
892                                                         format_type_be(tupdesc1->attrs[i1]->atttypid),
893                                                         format_type_be(tupdesc2->attrs[i2]->atttypid),
894                                                         j + 1)));
895
896                 /*
897                  * If they're not same collation, we don't complain here, but the
898                  * comparison function might.
899                  */
900                 collation = tupdesc1->attrs[i1]->attcollation;
901                 if (collation != tupdesc2->attrs[i2]->attcollation)
902                         collation = InvalidOid;
903
904                 /*
905                  * Lookup the comparison function if not done already
906                  */
907                 typentry = my_extra->columns[j].typentry;
908                 if (typentry == NULL ||
909                         typentry->type_id != tupdesc1->attrs[i1]->atttypid)
910                 {
911                         typentry = lookup_type_cache(tupdesc1->attrs[i1]->atttypid,
912                                                                                  TYPECACHE_CMP_PROC_FINFO);
913                         if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
914                                 ereport(ERROR,
915                                                 (errcode(ERRCODE_UNDEFINED_FUNCTION),
916                                 errmsg("could not identify a comparison function for type %s",
917                                            format_type_be(typentry->type_id))));
918                         my_extra->columns[j].typentry = typentry;
919                 }
920
921                 /*
922                  * We consider two NULLs equal; NULL > not-NULL.
923                  */
924                 if (!nulls1[i1] || !nulls2[i2])
925                 {
926                         FunctionCallInfoData locfcinfo;
927                         int32           cmpresult;
928
929                         if (nulls1[i1])
930                         {
931                                 /* arg1 is greater than arg2 */
932                                 result = 1;
933                                 break;
934                         }
935                         if (nulls2[i2])
936                         {
937                                 /* arg1 is less than arg2 */
938                                 result = -1;
939                                 break;
940                         }
941
942                         /* Compare the pair of elements */
943                         InitFunctionCallInfoData(locfcinfo, &typentry->cmp_proc_finfo, 2,
944                                                                          collation, NULL, NULL);
945                         locfcinfo.arg[0] = values1[i1];
946                         locfcinfo.arg[1] = values2[i2];
947                         locfcinfo.argnull[0] = false;
948                         locfcinfo.argnull[1] = false;
949                         locfcinfo.isnull = false;
950                         cmpresult = DatumGetInt32(FunctionCallInvoke(&locfcinfo));
951
952                         if (cmpresult < 0)
953                         {
954                                 /* arg1 is less than arg2 */
955                                 result = -1;
956                                 break;
957                         }
958                         else if (cmpresult > 0)
959                         {
960                                 /* arg1 is greater than arg2 */
961                                 result = 1;
962                                 break;
963                         }
964                 }
965
966                 /* equal, so continue to next column */
967                 i1++, i2++, j++;
968         }
969
970         /*
971          * If we didn't break out of the loop early, check for column count
972          * mismatch.  (We do not report such mismatch if we found unequal column
973          * values; is that a feature or a bug?)
974          */
975         if (result == 0)
976         {
977                 if (i1 != ncolumns1 || i2 != ncolumns2)
978                         ereport(ERROR,
979                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
980                                          errmsg("cannot compare record types with different numbers of columns")));
981         }
982
983         pfree(values1);
984         pfree(nulls1);
985         pfree(values2);
986         pfree(nulls2);
987         ReleaseTupleDesc(tupdesc1);
988         ReleaseTupleDesc(tupdesc2);
989
990         /* Avoid leaking memory when handed toasted input. */
991         PG_FREE_IF_COPY(record1, 0);
992         PG_FREE_IF_COPY(record2, 1);
993
994         return result;
995 }
996
997 /*
998  * record_eq :
999  *                compares two records for equality
1000  * result :
1001  *                returns true if the records are equal, false otherwise.
1002  *
1003  * Note: we do not use record_cmp here, since equality may be meaningful in
1004  * datatypes that don't have a total ordering (and hence no btree support).
1005  */
1006 Datum
1007 record_eq(PG_FUNCTION_ARGS)
1008 {
1009         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1010         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1011         bool            result = true;
1012         Oid                     tupType1;
1013         Oid                     tupType2;
1014         int32           tupTypmod1;
1015         int32           tupTypmod2;
1016         TupleDesc       tupdesc1;
1017         TupleDesc       tupdesc2;
1018         HeapTupleData tuple1;
1019         HeapTupleData tuple2;
1020         int                     ncolumns1;
1021         int                     ncolumns2;
1022         RecordCompareData *my_extra;
1023         int                     ncols;
1024         Datum      *values1;
1025         Datum      *values2;
1026         bool       *nulls1;
1027         bool       *nulls2;
1028         int                     i1;
1029         int                     i2;
1030         int                     j;
1031
1032         /* Extract type info from the tuples */
1033         tupType1 = HeapTupleHeaderGetTypeId(record1);
1034         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1035         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1036         ncolumns1 = tupdesc1->natts;
1037         tupType2 = HeapTupleHeaderGetTypeId(record2);
1038         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1039         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1040         ncolumns2 = tupdesc2->natts;
1041
1042         /* Build temporary HeapTuple control structures */
1043         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1044         ItemPointerSetInvalid(&(tuple1.t_self));
1045         tuple1.t_tableOid = InvalidOid;
1046         tuple1.t_data = record1;
1047         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1048         ItemPointerSetInvalid(&(tuple2.t_self));
1049         tuple2.t_tableOid = InvalidOid;
1050         tuple2.t_data = record2;
1051
1052         /*
1053          * We arrange to look up the needed comparison info just once per series
1054          * of calls, assuming the record types don't change underneath us.
1055          */
1056         ncols = Max(ncolumns1, ncolumns2);
1057         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1058         if (my_extra == NULL ||
1059                 my_extra->ncolumns < ncols)
1060         {
1061                 fcinfo->flinfo->fn_extra =
1062                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1063                                                            offsetof(RecordCompareData, columns) +
1064                                                            ncols * sizeof(ColumnCompareData));
1065                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1066                 my_extra->ncolumns = ncols;
1067                 my_extra->record1_type = InvalidOid;
1068                 my_extra->record1_typmod = 0;
1069                 my_extra->record2_type = InvalidOid;
1070                 my_extra->record2_typmod = 0;
1071         }
1072
1073         if (my_extra->record1_type != tupType1 ||
1074                 my_extra->record1_typmod != tupTypmod1 ||
1075                 my_extra->record2_type != tupType2 ||
1076                 my_extra->record2_typmod != tupTypmod2)
1077         {
1078                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1079                 my_extra->record1_type = tupType1;
1080                 my_extra->record1_typmod = tupTypmod1;
1081                 my_extra->record2_type = tupType2;
1082                 my_extra->record2_typmod = tupTypmod2;
1083         }
1084
1085         /* Break down the tuples into fields */
1086         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1087         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1088         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1089         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1090         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1091         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1092
1093         /*
1094          * Scan corresponding columns, allowing for dropped columns in different
1095          * places in the two rows.  i1 and i2 are physical column indexes, j is
1096          * the logical column index.
1097          */
1098         i1 = i2 = j = 0;
1099         while (i1 < ncolumns1 || i2 < ncolumns2)
1100         {
1101                 TypeCacheEntry *typentry;
1102                 Oid                     collation;
1103                 FunctionCallInfoData locfcinfo;
1104                 bool            oprresult;
1105
1106                 /*
1107                  * Skip dropped columns
1108                  */
1109                 if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped)
1110                 {
1111                         i1++;
1112                         continue;
1113                 }
1114                 if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped)
1115                 {
1116                         i2++;
1117                         continue;
1118                 }
1119                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1120                         break;                          /* we'll deal with mismatch below loop */
1121
1122                 /*
1123                  * Have two matching columns, they must be same type
1124                  */
1125                 if (tupdesc1->attrs[i1]->atttypid !=
1126                         tupdesc2->attrs[i2]->atttypid)
1127                         ereport(ERROR,
1128                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1129                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1130                                                         format_type_be(tupdesc1->attrs[i1]->atttypid),
1131                                                         format_type_be(tupdesc2->attrs[i2]->atttypid),
1132                                                         j + 1)));
1133
1134                 /*
1135                  * If they're not same collation, we don't complain here, but the
1136                  * equality function might.
1137                  */
1138                 collation = tupdesc1->attrs[i1]->attcollation;
1139                 if (collation != tupdesc2->attrs[i2]->attcollation)
1140                         collation = InvalidOid;
1141
1142                 /*
1143                  * Lookup the equality function if not done already
1144                  */
1145                 typentry = my_extra->columns[j].typentry;
1146                 if (typentry == NULL ||
1147                         typentry->type_id != tupdesc1->attrs[i1]->atttypid)
1148                 {
1149                         typentry = lookup_type_cache(tupdesc1->attrs[i1]->atttypid,
1150                                                                                  TYPECACHE_EQ_OPR_FINFO);
1151                         if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
1152                                 ereport(ERROR,
1153                                                 (errcode(ERRCODE_UNDEFINED_FUNCTION),
1154                                 errmsg("could not identify an equality operator for type %s",
1155                                            format_type_be(typentry->type_id))));
1156                         my_extra->columns[j].typentry = typentry;
1157                 }
1158
1159                 /*
1160                  * We consider two NULLs equal; NULL > not-NULL.
1161                  */
1162                 if (!nulls1[i1] || !nulls2[i2])
1163                 {
1164                         if (nulls1[i1] || nulls2[i2])
1165                         {
1166                                 result = false;
1167                                 break;
1168                         }
1169
1170                         /* Compare the pair of elements */
1171                         InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
1172                                                                          collation, NULL, NULL);
1173                         locfcinfo.arg[0] = values1[i1];
1174                         locfcinfo.arg[1] = values2[i2];
1175                         locfcinfo.argnull[0] = false;
1176                         locfcinfo.argnull[1] = false;
1177                         locfcinfo.isnull = false;
1178                         oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo));
1179                         if (!oprresult)
1180                         {
1181                                 result = false;
1182                                 break;
1183                         }
1184                 }
1185
1186                 /* equal, so continue to next column */
1187                 i1++, i2++, j++;
1188         }
1189
1190         /*
1191          * If we didn't break out of the loop early, check for column count
1192          * mismatch.  (We do not report such mismatch if we found unequal column
1193          * values; is that a feature or a bug?)
1194          */
1195         if (result)
1196         {
1197                 if (i1 != ncolumns1 || i2 != ncolumns2)
1198                         ereport(ERROR,
1199                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1200                                          errmsg("cannot compare record types with different numbers of columns")));
1201         }
1202
1203         pfree(values1);
1204         pfree(nulls1);
1205         pfree(values2);
1206         pfree(nulls2);
1207         ReleaseTupleDesc(tupdesc1);
1208         ReleaseTupleDesc(tupdesc2);
1209
1210         /* Avoid leaking memory when handed toasted input. */
1211         PG_FREE_IF_COPY(record1, 0);
1212         PG_FREE_IF_COPY(record2, 1);
1213
1214         PG_RETURN_BOOL(result);
1215 }
1216
1217 Datum
1218 record_ne(PG_FUNCTION_ARGS)
1219 {
1220         PG_RETURN_BOOL(!DatumGetBool(record_eq(fcinfo)));
1221 }
1222
1223 Datum
1224 record_lt(PG_FUNCTION_ARGS)
1225 {
1226         PG_RETURN_BOOL(record_cmp(fcinfo) < 0);
1227 }
1228
1229 Datum
1230 record_gt(PG_FUNCTION_ARGS)
1231 {
1232         PG_RETURN_BOOL(record_cmp(fcinfo) > 0);
1233 }
1234
1235 Datum
1236 record_le(PG_FUNCTION_ARGS)
1237 {
1238         PG_RETURN_BOOL(record_cmp(fcinfo) <= 0);
1239 }
1240
1241 Datum
1242 record_ge(PG_FUNCTION_ARGS)
1243 {
1244         PG_RETURN_BOOL(record_cmp(fcinfo) >= 0);
1245 }
1246
1247 Datum
1248 btrecordcmp(PG_FUNCTION_ARGS)
1249 {
1250         PG_RETURN_INT32(record_cmp(fcinfo));
1251 }
1252
1253
1254 /*
1255  * record_image_cmp :
1256  * Internal byte-oriented comparison function for records.
1257  *
1258  * Returns -1, 0 or 1
1259  *
1260  * Note: The normal concepts of "equality" do not apply here; different
1261  * representation of values considered to be equal are not considered to be
1262  * identical.  As an example, for the citext type 'A' and 'a' are equal, but
1263  * they are not identical.
1264  */
1265 static int
1266 record_image_cmp(FunctionCallInfo fcinfo)
1267 {
1268         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1269         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1270         int                     result = 0;
1271         Oid                     tupType1;
1272         Oid                     tupType2;
1273         int32           tupTypmod1;
1274         int32           tupTypmod2;
1275         TupleDesc       tupdesc1;
1276         TupleDesc       tupdesc2;
1277         HeapTupleData tuple1;
1278         HeapTupleData tuple2;
1279         int                     ncolumns1;
1280         int                     ncolumns2;
1281         RecordCompareData *my_extra;
1282         int                     ncols;
1283         Datum      *values1;
1284         Datum      *values2;
1285         bool       *nulls1;
1286         bool       *nulls2;
1287         int                     i1;
1288         int                     i2;
1289         int                     j;
1290
1291         /* Extract type info from the tuples */
1292         tupType1 = HeapTupleHeaderGetTypeId(record1);
1293         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1294         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1295         ncolumns1 = tupdesc1->natts;
1296         tupType2 = HeapTupleHeaderGetTypeId(record2);
1297         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1298         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1299         ncolumns2 = tupdesc2->natts;
1300
1301         /* Build temporary HeapTuple control structures */
1302         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1303         ItemPointerSetInvalid(&(tuple1.t_self));
1304         tuple1.t_tableOid = InvalidOid;
1305         tuple1.t_data = record1;
1306         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1307         ItemPointerSetInvalid(&(tuple2.t_self));
1308         tuple2.t_tableOid = InvalidOid;
1309         tuple2.t_data = record2;
1310
1311         /*
1312          * We arrange to look up the needed comparison info just once per series
1313          * of calls, assuming the record types don't change underneath us.
1314          */
1315         ncols = Max(ncolumns1, ncolumns2);
1316         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1317         if (my_extra == NULL ||
1318                 my_extra->ncolumns < ncols)
1319         {
1320                 fcinfo->flinfo->fn_extra =
1321                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1322                                                            offsetof(RecordCompareData, columns) +
1323                                                            ncols * sizeof(ColumnCompareData));
1324                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1325                 my_extra->ncolumns = ncols;
1326                 my_extra->record1_type = InvalidOid;
1327                 my_extra->record1_typmod = 0;
1328                 my_extra->record2_type = InvalidOid;
1329                 my_extra->record2_typmod = 0;
1330         }
1331
1332         if (my_extra->record1_type != tupType1 ||
1333                 my_extra->record1_typmod != tupTypmod1 ||
1334                 my_extra->record2_type != tupType2 ||
1335                 my_extra->record2_typmod != tupTypmod2)
1336         {
1337                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1338                 my_extra->record1_type = tupType1;
1339                 my_extra->record1_typmod = tupTypmod1;
1340                 my_extra->record2_type = tupType2;
1341                 my_extra->record2_typmod = tupTypmod2;
1342         }
1343
1344         /* Break down the tuples into fields */
1345         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1346         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1347         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1348         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1349         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1350         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1351
1352         /*
1353          * Scan corresponding columns, allowing for dropped columns in different
1354          * places in the two rows.  i1 and i2 are physical column indexes, j is
1355          * the logical column index.
1356          */
1357         i1 = i2 = j = 0;
1358         while (i1 < ncolumns1 || i2 < ncolumns2)
1359         {
1360                 /*
1361                  * Skip dropped columns
1362                  */
1363                 if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped)
1364                 {
1365                         i1++;
1366                         continue;
1367                 }
1368                 if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped)
1369                 {
1370                         i2++;
1371                         continue;
1372                 }
1373                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1374                         break;                          /* we'll deal with mismatch below loop */
1375
1376                 /*
1377                  * Have two matching columns, they must be same type
1378                  */
1379                 if (tupdesc1->attrs[i1]->atttypid !=
1380                         tupdesc2->attrs[i2]->atttypid)
1381                         ereport(ERROR,
1382                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1383                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1384                                                         format_type_be(tupdesc1->attrs[i1]->atttypid),
1385                                                         format_type_be(tupdesc2->attrs[i2]->atttypid),
1386                                                         j + 1)));
1387
1388                 /*
1389                  * The same type should have the same length (or both should be
1390                  * variable).
1391                  */
1392                 Assert(tupdesc1->attrs[i1]->attlen ==
1393                            tupdesc2->attrs[i2]->attlen);
1394
1395                 /*
1396                  * We consider two NULLs equal; NULL > not-NULL.
1397                  */
1398                 if (!nulls1[i1] || !nulls2[i2])
1399                 {
1400                         int                     cmpresult = 0;
1401
1402                         if (nulls1[i1])
1403                         {
1404                                 /* arg1 is greater than arg2 */
1405                                 result = 1;
1406                                 break;
1407                         }
1408                         if (nulls2[i2])
1409                         {
1410                                 /* arg1 is less than arg2 */
1411                                 result = -1;
1412                                 break;
1413                         }
1414
1415                         /* Compare the pair of elements */
1416                         if (tupdesc1->attrs[i1]->attlen == -1)
1417                         {
1418                                 Size            len1,
1419                                                         len2;
1420                                 struct varlena *arg1val;
1421                                 struct varlena *arg2val;
1422
1423                                 len1 = toast_raw_datum_size(values1[i1]);
1424                                 len2 = toast_raw_datum_size(values2[i2]);
1425                                 arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1426                                 arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1427
1428                                 cmpresult = memcmp(VARDATA_ANY(arg1val),
1429                                                                    VARDATA_ANY(arg2val),
1430                                                                    Min(len1, len2) - VARHDRSZ);
1431                                 if ((cmpresult == 0) && (len1 != len2))
1432                                         cmpresult = (len1 < len2) ? -1 : 1;
1433
1434                                 if ((Pointer) arg1val != (Pointer) values1[i1])
1435                                         pfree(arg1val);
1436                                 if ((Pointer) arg2val != (Pointer) values2[i2])
1437                                         pfree(arg2val);
1438                         }
1439                         else if (tupdesc1->attrs[i1]->attbyval)
1440                         {
1441                                 switch (tupdesc1->attrs[i1]->attlen)
1442                                 {
1443                                         case 1:
1444                                                 if (GET_1_BYTE(values1[i1]) !=
1445                                                         GET_1_BYTE(values2[i2]))
1446                                                 {
1447                                                         cmpresult = (GET_1_BYTE(values1[i1]) <
1448                                                                                  GET_1_BYTE(values2[i2])) ? -1 : 1;
1449                                                 }
1450                                                 break;
1451                                         case 2:
1452                                                 if (GET_2_BYTES(values1[i1]) !=
1453                                                         GET_2_BYTES(values2[i2]))
1454                                                 {
1455                                                         cmpresult = (GET_2_BYTES(values1[i1]) <
1456                                                                                  GET_2_BYTES(values2[i2])) ? -1 : 1;
1457                                                 }
1458                                                 break;
1459                                         case 4:
1460                                                 if (GET_4_BYTES(values1[i1]) !=
1461                                                         GET_4_BYTES(values2[i2]))
1462                                                 {
1463                                                         cmpresult = (GET_4_BYTES(values1[i1]) <
1464                                                                                  GET_4_BYTES(values2[i2])) ? -1 : 1;
1465                                                 }
1466                                                 break;
1467 #if SIZEOF_DATUM == 8
1468                                         case 8:
1469                                                 if (GET_8_BYTES(values1[i1]) !=
1470                                                         GET_8_BYTES(values2[i2]))
1471                                                 {
1472                                                         cmpresult = (GET_8_BYTES(values1[i1]) <
1473                                                                                  GET_8_BYTES(values2[i2])) ? -1 : 1;
1474                                                 }
1475                                                 break;
1476 #endif
1477                                         default:
1478                                                 Assert(false);  /* cannot happen */
1479                                 }
1480                         }
1481                         else
1482                         {
1483                                 cmpresult = memcmp(DatumGetPointer(values1[i1]),
1484                                                                    DatumGetPointer(values2[i2]),
1485                                                                    tupdesc1->attrs[i1]->attlen);
1486                         }
1487
1488                         if (cmpresult < 0)
1489                         {
1490                                 /* arg1 is less than arg2 */
1491                                 result = -1;
1492                                 break;
1493                         }
1494                         else if (cmpresult > 0)
1495                         {
1496                                 /* arg1 is greater than arg2 */
1497                                 result = 1;
1498                                 break;
1499                         }
1500                 }
1501
1502                 /* equal, so continue to next column */
1503                 i1++, i2++, j++;
1504         }
1505
1506         /*
1507          * If we didn't break out of the loop early, check for column count
1508          * mismatch.  (We do not report such mismatch if we found unequal column
1509          * values; is that a feature or a bug?)
1510          */
1511         if (result == 0)
1512         {
1513                 if (i1 != ncolumns1 || i2 != ncolumns2)
1514                         ereport(ERROR,
1515                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1516                                          errmsg("cannot compare record types with different numbers of columns")));
1517         }
1518
1519         pfree(values1);
1520         pfree(nulls1);
1521         pfree(values2);
1522         pfree(nulls2);
1523         ReleaseTupleDesc(tupdesc1);
1524         ReleaseTupleDesc(tupdesc2);
1525
1526         /* Avoid leaking memory when handed toasted input. */
1527         PG_FREE_IF_COPY(record1, 0);
1528         PG_FREE_IF_COPY(record2, 1);
1529
1530         return result;
1531 }
1532
1533 /*
1534  * record_image_eq :
1535  *                compares two records for identical contents, based on byte images
1536  * result :
1537  *                returns true if the records are identical, false otherwise.
1538  *
1539  * Note: we do not use record_image_cmp here, since we can avoid
1540  * de-toasting for unequal lengths this way.
1541  */
1542 Datum
1543 record_image_eq(PG_FUNCTION_ARGS)
1544 {
1545         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1546         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1547         bool            result = true;
1548         Oid                     tupType1;
1549         Oid                     tupType2;
1550         int32           tupTypmod1;
1551         int32           tupTypmod2;
1552         TupleDesc       tupdesc1;
1553         TupleDesc       tupdesc2;
1554         HeapTupleData tuple1;
1555         HeapTupleData tuple2;
1556         int                     ncolumns1;
1557         int                     ncolumns2;
1558         RecordCompareData *my_extra;
1559         int                     ncols;
1560         Datum      *values1;
1561         Datum      *values2;
1562         bool       *nulls1;
1563         bool       *nulls2;
1564         int                     i1;
1565         int                     i2;
1566         int                     j;
1567
1568         /* Extract type info from the tuples */
1569         tupType1 = HeapTupleHeaderGetTypeId(record1);
1570         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1571         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1572         ncolumns1 = tupdesc1->natts;
1573         tupType2 = HeapTupleHeaderGetTypeId(record2);
1574         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1575         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1576         ncolumns2 = tupdesc2->natts;
1577
1578         /* Build temporary HeapTuple control structures */
1579         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1580         ItemPointerSetInvalid(&(tuple1.t_self));
1581         tuple1.t_tableOid = InvalidOid;
1582         tuple1.t_data = record1;
1583         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1584         ItemPointerSetInvalid(&(tuple2.t_self));
1585         tuple2.t_tableOid = InvalidOid;
1586         tuple2.t_data = record2;
1587
1588         /*
1589          * We arrange to look up the needed comparison info just once per series
1590          * of calls, assuming the record types don't change underneath us.
1591          */
1592         ncols = Max(ncolumns1, ncolumns2);
1593         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1594         if (my_extra == NULL ||
1595                 my_extra->ncolumns < ncols)
1596         {
1597                 fcinfo->flinfo->fn_extra =
1598                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1599                                                            offsetof(RecordCompareData, columns) +
1600                                                            ncols * sizeof(ColumnCompareData));
1601                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1602                 my_extra->ncolumns = ncols;
1603                 my_extra->record1_type = InvalidOid;
1604                 my_extra->record1_typmod = 0;
1605                 my_extra->record2_type = InvalidOid;
1606                 my_extra->record2_typmod = 0;
1607         }
1608
1609         if (my_extra->record1_type != tupType1 ||
1610                 my_extra->record1_typmod != tupTypmod1 ||
1611                 my_extra->record2_type != tupType2 ||
1612                 my_extra->record2_typmod != tupTypmod2)
1613         {
1614                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1615                 my_extra->record1_type = tupType1;
1616                 my_extra->record1_typmod = tupTypmod1;
1617                 my_extra->record2_type = tupType2;
1618                 my_extra->record2_typmod = tupTypmod2;
1619         }
1620
1621         /* Break down the tuples into fields */
1622         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1623         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1624         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1625         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1626         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1627         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1628
1629         /*
1630          * Scan corresponding columns, allowing for dropped columns in different
1631          * places in the two rows.  i1 and i2 are physical column indexes, j is
1632          * the logical column index.
1633          */
1634         i1 = i2 = j = 0;
1635         while (i1 < ncolumns1 || i2 < ncolumns2)
1636         {
1637                 /*
1638                  * Skip dropped columns
1639                  */
1640                 if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped)
1641                 {
1642                         i1++;
1643                         continue;
1644                 }
1645                 if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped)
1646                 {
1647                         i2++;
1648                         continue;
1649                 }
1650                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1651                         break;                          /* we'll deal with mismatch below loop */
1652
1653                 /*
1654                  * Have two matching columns, they must be same type
1655                  */
1656                 if (tupdesc1->attrs[i1]->atttypid !=
1657                         tupdesc2->attrs[i2]->atttypid)
1658                         ereport(ERROR,
1659                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1660                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1661                                                         format_type_be(tupdesc1->attrs[i1]->atttypid),
1662                                                         format_type_be(tupdesc2->attrs[i2]->atttypid),
1663                                                         j + 1)));
1664
1665                 /*
1666                  * We consider two NULLs equal; NULL > not-NULL.
1667                  */
1668                 if (!nulls1[i1] || !nulls2[i2])
1669                 {
1670                         if (nulls1[i1] || nulls2[i2])
1671                         {
1672                                 result = false;
1673                                 break;
1674                         }
1675
1676                         /* Compare the pair of elements */
1677                         if (tupdesc1->attrs[i1]->attlen == -1)
1678                         {
1679                                 Size            len1,
1680                                                         len2;
1681
1682                                 len1 = toast_raw_datum_size(values1[i1]);
1683                                 len2 = toast_raw_datum_size(values2[i2]);
1684                                 /* No need to de-toast if lengths don't match. */
1685                                 if (len1 != len2)
1686                                         result = false;
1687                                 else
1688                                 {
1689                                         struct varlena *arg1val;
1690                                         struct varlena *arg2val;
1691
1692                                         arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1693                                         arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1694
1695                                         result = (memcmp(VARDATA_ANY(arg1val),
1696                                                                          VARDATA_ANY(arg2val),
1697                                                                          len1 - VARHDRSZ) == 0);
1698
1699                                         /* Only free memory if it's a copy made here. */
1700                                         if ((Pointer) arg1val != (Pointer) values1[i1])
1701                                                 pfree(arg1val);
1702                                         if ((Pointer) arg2val != (Pointer) values2[i2])
1703                                                 pfree(arg2val);
1704                                 }
1705                         }
1706                         else if (tupdesc1->attrs[i1]->attbyval)
1707                         {
1708                                 switch (tupdesc1->attrs[i1]->attlen)
1709                                 {
1710                                         case 1:
1711                                                 result = (GET_1_BYTE(values1[i1]) ==
1712                                                                   GET_1_BYTE(values2[i2]));
1713                                                 break;
1714                                         case 2:
1715                                                 result = (GET_2_BYTES(values1[i1]) ==
1716                                                                   GET_2_BYTES(values2[i2]));
1717                                                 break;
1718                                         case 4:
1719                                                 result = (GET_4_BYTES(values1[i1]) ==
1720                                                                   GET_4_BYTES(values2[i2]));
1721                                                 break;
1722 #if SIZEOF_DATUM == 8
1723                                         case 8:
1724                                                 result = (GET_8_BYTES(values1[i1]) ==
1725                                                                   GET_8_BYTES(values2[i2]));
1726                                                 break;
1727 #endif
1728                                         default:
1729                                                 Assert(false);  /* cannot happen */
1730                                 }
1731                         }
1732                         else
1733                         {
1734                                 result = (memcmp(DatumGetPointer(values1[i1]),
1735                                                                  DatumGetPointer(values2[i2]),
1736                                                                  tupdesc1->attrs[i1]->attlen) == 0);
1737                         }
1738                         if (!result)
1739                                 break;
1740                 }
1741
1742                 /* equal, so continue to next column */
1743                 i1++, i2++, j++;
1744         }
1745
1746         /*
1747          * If we didn't break out of the loop early, check for column count
1748          * mismatch.  (We do not report such mismatch if we found unequal column
1749          * values; is that a feature or a bug?)
1750          */
1751         if (result)
1752         {
1753                 if (i1 != ncolumns1 || i2 != ncolumns2)
1754                         ereport(ERROR,
1755                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1756                                          errmsg("cannot compare record types with different numbers of columns")));
1757         }
1758
1759         pfree(values1);
1760         pfree(nulls1);
1761         pfree(values2);
1762         pfree(nulls2);
1763         ReleaseTupleDesc(tupdesc1);
1764         ReleaseTupleDesc(tupdesc2);
1765
1766         /* Avoid leaking memory when handed toasted input. */
1767         PG_FREE_IF_COPY(record1, 0);
1768         PG_FREE_IF_COPY(record2, 1);
1769
1770         PG_RETURN_BOOL(result);
1771 }
1772
1773 Datum
1774 record_image_ne(PG_FUNCTION_ARGS)
1775 {
1776         PG_RETURN_BOOL(!DatumGetBool(record_image_eq(fcinfo)));
1777 }
1778
1779 Datum
1780 record_image_lt(PG_FUNCTION_ARGS)
1781 {
1782         PG_RETURN_BOOL(record_image_cmp(fcinfo) < 0);
1783 }
1784
1785 Datum
1786 record_image_gt(PG_FUNCTION_ARGS)
1787 {
1788         PG_RETURN_BOOL(record_image_cmp(fcinfo) > 0);
1789 }
1790
1791 Datum
1792 record_image_le(PG_FUNCTION_ARGS)
1793 {
1794         PG_RETURN_BOOL(record_image_cmp(fcinfo) <= 0);
1795 }
1796
1797 Datum
1798 record_image_ge(PG_FUNCTION_ARGS)
1799 {
1800         PG_RETURN_BOOL(record_image_cmp(fcinfo) >= 0);
1801 }
1802
1803 Datum
1804 btrecordimagecmp(PG_FUNCTION_ARGS)
1805 {
1806         PG_RETURN_INT32(record_image_cmp(fcinfo));
1807 }