]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/rowtypes.c
Replace remaining uses of pq_sendint with pq_sendint{8,16,32}.
[postgresql] / src / backend / utils / adt / rowtypes.c
1 /*-------------------------------------------------------------------------
2  *
3  * rowtypes.c
4  *        I/O and comparison functions for generic composite types.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/utils/adt/rowtypes.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include <ctype.h>
18
19 #include "access/htup_details.h"
20 #include "access/tuptoaster.h"
21 #include "catalog/pg_type.h"
22 #include "funcapi.h"
23 #include "libpq/pqformat.h"
24 #include "miscadmin.h"
25 #include "utils/builtins.h"
26 #include "utils/lsyscache.h"
27 #include "utils/typcache.h"
28
29
30 /*
31  * structure to cache metadata needed for record I/O
32  */
33 typedef struct ColumnIOData
34 {
35         Oid                     column_type;
36         Oid                     typiofunc;
37         Oid                     typioparam;
38         bool            typisvarlena;
39         FmgrInfo        proc;
40 } ColumnIOData;
41
42 typedef struct RecordIOData
43 {
44         Oid                     record_type;
45         int32           record_typmod;
46         int                     ncolumns;
47         ColumnIOData columns[FLEXIBLE_ARRAY_MEMBER];
48 } RecordIOData;
49
50 /*
51  * structure to cache metadata needed for record comparison
52  */
53 typedef struct ColumnCompareData
54 {
55         TypeCacheEntry *typentry;       /* has everything we need, actually */
56 } ColumnCompareData;
57
58 typedef struct RecordCompareData
59 {
60         int                     ncolumns;               /* allocated length of columns[] */
61         Oid                     record1_type;
62         int32           record1_typmod;
63         Oid                     record2_type;
64         int32           record2_typmod;
65         ColumnCompareData columns[FLEXIBLE_ARRAY_MEMBER];
66 } RecordCompareData;
67
68
69 /*
70  * record_in            - input routine for any composite type.
71  */
72 Datum
73 record_in(PG_FUNCTION_ARGS)
74 {
75         char       *string = PG_GETARG_CSTRING(0);
76         Oid                     tupType = PG_GETARG_OID(1);
77         int32           tupTypmod = PG_GETARG_INT32(2);
78         HeapTupleHeader result;
79         TupleDesc       tupdesc;
80         HeapTuple       tuple;
81         RecordIOData *my_extra;
82         bool            needComma = false;
83         int                     ncolumns;
84         int                     i;
85         char       *ptr;
86         Datum      *values;
87         bool       *nulls;
88         StringInfoData buf;
89
90         check_stack_depth();            /* recurses for record-type columns */
91
92         /*
93          * Give a friendly error message if we did not get enough info to identify
94          * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
95          * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
96          * for typmod, since composite types and RECORD have no type modifiers at
97          * the SQL level, and thus must fail for RECORD.  However some callers can
98          * supply a valid typmod, and then we can do something useful for RECORD.
99          */
100         if (tupType == RECORDOID && tupTypmod < 0)
101                 ereport(ERROR,
102                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
103                                  errmsg("input of anonymous composite types is not implemented")));
104
105         /*
106          * This comes from the composite type's pg_type.oid and stores system oids
107          * in user tables, specifically DatumTupleFields. This oid must be
108          * preserved by binary upgrades.
109          */
110         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
111         ncolumns = tupdesc->natts;
112
113         /*
114          * We arrange to look up the needed I/O info just once per series of
115          * calls, assuming the record type doesn't change underneath us.
116          */
117         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
118         if (my_extra == NULL ||
119                 my_extra->ncolumns != ncolumns)
120         {
121                 fcinfo->flinfo->fn_extra =
122                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
123                                                            offsetof(RecordIOData, columns) +
124                                                            ncolumns * sizeof(ColumnIOData));
125                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
126                 my_extra->record_type = InvalidOid;
127                 my_extra->record_typmod = 0;
128         }
129
130         if (my_extra->record_type != tupType ||
131                 my_extra->record_typmod != tupTypmod)
132         {
133                 MemSet(my_extra, 0,
134                            offsetof(RecordIOData, columns) +
135                            ncolumns * sizeof(ColumnIOData));
136                 my_extra->record_type = tupType;
137                 my_extra->record_typmod = tupTypmod;
138                 my_extra->ncolumns = ncolumns;
139         }
140
141         values = (Datum *) palloc(ncolumns * sizeof(Datum));
142         nulls = (bool *) palloc(ncolumns * sizeof(bool));
143
144         /*
145          * Scan the string.  We use "buf" to accumulate the de-quoted data for
146          * each column, which is then fed to the appropriate input converter.
147          */
148         ptr = string;
149         /* Allow leading whitespace */
150         while (*ptr && isspace((unsigned char) *ptr))
151                 ptr++;
152         if (*ptr++ != '(')
153                 ereport(ERROR,
154                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
155                                  errmsg("malformed record literal: \"%s\"", string),
156                                  errdetail("Missing left parenthesis.")));
157
158         initStringInfo(&buf);
159
160         for (i = 0; i < ncolumns; i++)
161         {
162                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
163                 ColumnIOData *column_info = &my_extra->columns[i];
164                 Oid                     column_type = att->atttypid;
165                 char       *column_data;
166
167                 /* Ignore dropped columns in datatype, but fill with nulls */
168                 if (att->attisdropped)
169                 {
170                         values[i] = (Datum) 0;
171                         nulls[i] = true;
172                         continue;
173                 }
174
175                 if (needComma)
176                 {
177                         /* Skip comma that separates prior field from this one */
178                         if (*ptr == ',')
179                                 ptr++;
180                         else
181                                 /* *ptr must be ')' */
182                                 ereport(ERROR,
183                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
184                                                  errmsg("malformed record literal: \"%s\"", string),
185                                                  errdetail("Too few columns.")));
186                 }
187
188                 /* Check for null: completely empty input means null */
189                 if (*ptr == ',' || *ptr == ')')
190                 {
191                         column_data = NULL;
192                         nulls[i] = true;
193                 }
194                 else
195                 {
196                         /* Extract string for this column */
197                         bool            inquote = false;
198
199                         resetStringInfo(&buf);
200                         while (inquote || !(*ptr == ',' || *ptr == ')'))
201                         {
202                                 char            ch = *ptr++;
203
204                                 if (ch == '\0')
205                                         ereport(ERROR,
206                                                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
207                                                          errmsg("malformed record literal: \"%s\"",
208                                                                         string),
209                                                          errdetail("Unexpected end of input.")));
210                                 if (ch == '\\')
211                                 {
212                                         if (*ptr == '\0')
213                                                 ereport(ERROR,
214                                                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
215                                                                  errmsg("malformed record literal: \"%s\"",
216                                                                                 string),
217                                                                  errdetail("Unexpected end of input.")));
218                                         appendStringInfoChar(&buf, *ptr++);
219                                 }
220                                 else if (ch == '"')
221                                 {
222                                         if (!inquote)
223                                                 inquote = true;
224                                         else if (*ptr == '"')
225                                         {
226                                                 /* doubled quote within quote sequence */
227                                                 appendStringInfoChar(&buf, *ptr++);
228                                         }
229                                         else
230                                                 inquote = false;
231                                 }
232                                 else
233                                         appendStringInfoChar(&buf, ch);
234                         }
235
236                         column_data = buf.data;
237                         nulls[i] = false;
238                 }
239
240                 /*
241                  * Convert the column value
242                  */
243                 if (column_info->column_type != column_type)
244                 {
245                         getTypeInputInfo(column_type,
246                                                          &column_info->typiofunc,
247                                                          &column_info->typioparam);
248                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
249                                                   fcinfo->flinfo->fn_mcxt);
250                         column_info->column_type = column_type;
251                 }
252
253                 values[i] = InputFunctionCall(&column_info->proc,
254                                                                           column_data,
255                                                                           column_info->typioparam,
256                                                                           att->atttypmod);
257
258                 /*
259                  * Prep for next column
260                  */
261                 needComma = true;
262         }
263
264         if (*ptr++ != ')')
265                 ereport(ERROR,
266                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
267                                  errmsg("malformed record literal: \"%s\"", string),
268                                  errdetail("Too many columns.")));
269         /* Allow trailing whitespace */
270         while (*ptr && isspace((unsigned char) *ptr))
271                 ptr++;
272         if (*ptr)
273                 ereport(ERROR,
274                                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
275                                  errmsg("malformed record literal: \"%s\"", string),
276                                  errdetail("Junk after right parenthesis.")));
277
278         tuple = heap_form_tuple(tupdesc, values, nulls);
279
280         /*
281          * We cannot return tuple->t_data because heap_form_tuple allocates it as
282          * part of a larger chunk, and our caller may expect to be able to pfree
283          * our result.  So must copy the info into a new palloc chunk.
284          */
285         result = (HeapTupleHeader) palloc(tuple->t_len);
286         memcpy(result, tuple->t_data, tuple->t_len);
287
288         heap_freetuple(tuple);
289         pfree(buf.data);
290         pfree(values);
291         pfree(nulls);
292         ReleaseTupleDesc(tupdesc);
293
294         PG_RETURN_HEAPTUPLEHEADER(result);
295 }
296
297 /*
298  * record_out           - output routine for any composite type.
299  */
300 Datum
301 record_out(PG_FUNCTION_ARGS)
302 {
303         HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
304         Oid                     tupType;
305         int32           tupTypmod;
306         TupleDesc       tupdesc;
307         HeapTupleData tuple;
308         RecordIOData *my_extra;
309         bool            needComma = false;
310         int                     ncolumns;
311         int                     i;
312         Datum      *values;
313         bool       *nulls;
314         StringInfoData buf;
315
316         check_stack_depth();            /* recurses for record-type columns */
317
318         /* Extract type info from the tuple itself */
319         tupType = HeapTupleHeaderGetTypeId(rec);
320         tupTypmod = HeapTupleHeaderGetTypMod(rec);
321         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
322         ncolumns = tupdesc->natts;
323
324         /* Build a temporary HeapTuple control structure */
325         tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
326         ItemPointerSetInvalid(&(tuple.t_self));
327         tuple.t_tableOid = InvalidOid;
328         tuple.t_data = rec;
329
330         /*
331          * We arrange to look up the needed I/O info just once per series of
332          * calls, assuming the record type doesn't change underneath us.
333          */
334         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
335         if (my_extra == NULL ||
336                 my_extra->ncolumns != ncolumns)
337         {
338                 fcinfo->flinfo->fn_extra =
339                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
340                                                            offsetof(RecordIOData, columns) +
341                                                            ncolumns * sizeof(ColumnIOData));
342                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
343                 my_extra->record_type = InvalidOid;
344                 my_extra->record_typmod = 0;
345         }
346
347         if (my_extra->record_type != tupType ||
348                 my_extra->record_typmod != tupTypmod)
349         {
350                 MemSet(my_extra, 0,
351                            offsetof(RecordIOData, columns) +
352                            ncolumns * sizeof(ColumnIOData));
353                 my_extra->record_type = tupType;
354                 my_extra->record_typmod = tupTypmod;
355                 my_extra->ncolumns = ncolumns;
356         }
357
358         values = (Datum *) palloc(ncolumns * sizeof(Datum));
359         nulls = (bool *) palloc(ncolumns * sizeof(bool));
360
361         /* Break down the tuple into fields */
362         heap_deform_tuple(&tuple, tupdesc, values, nulls);
363
364         /* And build the result string */
365         initStringInfo(&buf);
366
367         appendStringInfoChar(&buf, '(');
368
369         for (i = 0; i < ncolumns; i++)
370         {
371                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
372                 ColumnIOData *column_info = &my_extra->columns[i];
373                 Oid                     column_type = att->atttypid;
374                 Datum           attr;
375                 char       *value;
376                 char       *tmp;
377                 bool            nq;
378
379                 /* Ignore dropped columns in datatype */
380                 if (att->attisdropped)
381                         continue;
382
383                 if (needComma)
384                         appendStringInfoChar(&buf, ',');
385                 needComma = true;
386
387                 if (nulls[i])
388                 {
389                         /* emit nothing... */
390                         continue;
391                 }
392
393                 /*
394                  * Convert the column value to text
395                  */
396                 if (column_info->column_type != column_type)
397                 {
398                         getTypeOutputInfo(column_type,
399                                                           &column_info->typiofunc,
400                                                           &column_info->typisvarlena);
401                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
402                                                   fcinfo->flinfo->fn_mcxt);
403                         column_info->column_type = column_type;
404                 }
405
406                 attr = values[i];
407                 value = OutputFunctionCall(&column_info->proc, attr);
408
409                 /* Detect whether we need double quotes for this value */
410                 nq = (value[0] == '\0');        /* force quotes for empty string */
411                 for (tmp = value; *tmp; tmp++)
412                 {
413                         char            ch = *tmp;
414
415                         if (ch == '"' || ch == '\\' ||
416                                 ch == '(' || ch == ')' || ch == ',' ||
417                                 isspace((unsigned char) ch))
418                         {
419                                 nq = true;
420                                 break;
421                         }
422                 }
423
424                 /* And emit the string */
425                 if (nq)
426                         appendStringInfoCharMacro(&buf, '"');
427                 for (tmp = value; *tmp; tmp++)
428                 {
429                         char            ch = *tmp;
430
431                         if (ch == '"' || ch == '\\')
432                                 appendStringInfoCharMacro(&buf, ch);
433                         appendStringInfoCharMacro(&buf, ch);
434                 }
435                 if (nq)
436                         appendStringInfoCharMacro(&buf, '"');
437         }
438
439         appendStringInfoChar(&buf, ')');
440
441         pfree(values);
442         pfree(nulls);
443         ReleaseTupleDesc(tupdesc);
444
445         PG_RETURN_CSTRING(buf.data);
446 }
447
448 /*
449  * record_recv          - binary input routine for any composite type.
450  */
451 Datum
452 record_recv(PG_FUNCTION_ARGS)
453 {
454         StringInfo      buf = (StringInfo) PG_GETARG_POINTER(0);
455         Oid                     tupType = PG_GETARG_OID(1);
456         int32           tupTypmod = PG_GETARG_INT32(2);
457         HeapTupleHeader result;
458         TupleDesc       tupdesc;
459         HeapTuple       tuple;
460         RecordIOData *my_extra;
461         int                     ncolumns;
462         int                     usercols;
463         int                     validcols;
464         int                     i;
465         Datum      *values;
466         bool       *nulls;
467
468         check_stack_depth();            /* recurses for record-type columns */
469
470         /*
471          * Give a friendly error message if we did not get enough info to identify
472          * the target record type.  (lookup_rowtype_tupdesc would fail anyway, but
473          * with a non-user-friendly message.)  In ordinary SQL usage, we'll get -1
474          * for typmod, since composite types and RECORD have no type modifiers at
475          * the SQL level, and thus must fail for RECORD.  However some callers can
476          * supply a valid typmod, and then we can do something useful for RECORD.
477          */
478         if (tupType == RECORDOID && tupTypmod < 0)
479                 ereport(ERROR,
480                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481                                  errmsg("input of anonymous composite types is not implemented")));
482
483         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
484         ncolumns = tupdesc->natts;
485
486         /*
487          * We arrange to look up the needed I/O info just once per series of
488          * calls, assuming the record type doesn't change underneath us.
489          */
490         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
491         if (my_extra == NULL ||
492                 my_extra->ncolumns != ncolumns)
493         {
494                 fcinfo->flinfo->fn_extra =
495                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
496                                                            offsetof(RecordIOData, columns) +
497                                                            ncolumns * sizeof(ColumnIOData));
498                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
499                 my_extra->record_type = InvalidOid;
500                 my_extra->record_typmod = 0;
501         }
502
503         if (my_extra->record_type != tupType ||
504                 my_extra->record_typmod != tupTypmod)
505         {
506                 MemSet(my_extra, 0,
507                            offsetof(RecordIOData, columns) +
508                            ncolumns * sizeof(ColumnIOData));
509                 my_extra->record_type = tupType;
510                 my_extra->record_typmod = tupTypmod;
511                 my_extra->ncolumns = ncolumns;
512         }
513
514         values = (Datum *) palloc(ncolumns * sizeof(Datum));
515         nulls = (bool *) palloc(ncolumns * sizeof(bool));
516
517         /* Fetch number of columns user thinks it has */
518         usercols = pq_getmsgint(buf, 4);
519
520         /* Need to scan to count nondeleted columns */
521         validcols = 0;
522         for (i = 0; i < ncolumns; i++)
523         {
524                 if (!TupleDescAttr(tupdesc, i)->attisdropped)
525                         validcols++;
526         }
527         if (usercols != validcols)
528                 ereport(ERROR,
529                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
530                                  errmsg("wrong number of columns: %d, expected %d",
531                                                 usercols, validcols)));
532
533         /* Process each column */
534         for (i = 0; i < ncolumns; i++)
535         {
536                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
537                 ColumnIOData *column_info = &my_extra->columns[i];
538                 Oid                     column_type = att->atttypid;
539                 Oid                     coltypoid;
540                 int                     itemlen;
541                 StringInfoData item_buf;
542                 StringInfo      bufptr;
543                 char            csave;
544
545                 /* Ignore dropped columns in datatype, but fill with nulls */
546                 if (att->attisdropped)
547                 {
548                         values[i] = (Datum) 0;
549                         nulls[i] = true;
550                         continue;
551                 }
552
553                 /* Verify column datatype */
554                 coltypoid = pq_getmsgint(buf, sizeof(Oid));
555                 if (coltypoid != column_type)
556                         ereport(ERROR,
557                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
558                                          errmsg("wrong data type: %u, expected %u",
559                                                         coltypoid, column_type)));
560
561                 /* Get and check the item length */
562                 itemlen = pq_getmsgint(buf, 4);
563                 if (itemlen < -1 || itemlen > (buf->len - buf->cursor))
564                         ereport(ERROR,
565                                         (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
566                                          errmsg("insufficient data left in message")));
567
568                 if (itemlen == -1)
569                 {
570                         /* -1 length means NULL */
571                         bufptr = NULL;
572                         nulls[i] = true;
573                         csave = 0;                      /* keep compiler quiet */
574                 }
575                 else
576                 {
577                         /*
578                          * Rather than copying data around, we just set up a phony
579                          * StringInfo pointing to the correct portion of the input buffer.
580                          * We assume we can scribble on the input buffer so as to maintain
581                          * the convention that StringInfos have a trailing null.
582                          */
583                         item_buf.data = &buf->data[buf->cursor];
584                         item_buf.maxlen = itemlen + 1;
585                         item_buf.len = itemlen;
586                         item_buf.cursor = 0;
587
588                         buf->cursor += itemlen;
589
590                         csave = buf->data[buf->cursor];
591                         buf->data[buf->cursor] = '\0';
592
593                         bufptr = &item_buf;
594                         nulls[i] = false;
595                 }
596
597                 /* Now call the column's receiveproc */
598                 if (column_info->column_type != column_type)
599                 {
600                         getTypeBinaryInputInfo(column_type,
601                                                                    &column_info->typiofunc,
602                                                                    &column_info->typioparam);
603                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
604                                                   fcinfo->flinfo->fn_mcxt);
605                         column_info->column_type = column_type;
606                 }
607
608                 values[i] = ReceiveFunctionCall(&column_info->proc,
609                                                                                 bufptr,
610                                                                                 column_info->typioparam,
611                                                                                 att->atttypmod);
612
613                 if (bufptr)
614                 {
615                         /* Trouble if it didn't eat the whole buffer */
616                         if (item_buf.cursor != itemlen)
617                                 ereport(ERROR,
618                                                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
619                                                  errmsg("improper binary format in record column %d",
620                                                                 i + 1)));
621
622                         buf->data[buf->cursor] = csave;
623                 }
624         }
625
626         tuple = heap_form_tuple(tupdesc, values, nulls);
627
628         /*
629          * We cannot return tuple->t_data because heap_form_tuple allocates it as
630          * part of a larger chunk, and our caller may expect to be able to pfree
631          * our result.  So must copy the info into a new palloc chunk.
632          */
633         result = (HeapTupleHeader) palloc(tuple->t_len);
634         memcpy(result, tuple->t_data, tuple->t_len);
635
636         heap_freetuple(tuple);
637         pfree(values);
638         pfree(nulls);
639         ReleaseTupleDesc(tupdesc);
640
641         PG_RETURN_HEAPTUPLEHEADER(result);
642 }
643
644 /*
645  * record_send          - binary output routine for any composite type.
646  */
647 Datum
648 record_send(PG_FUNCTION_ARGS)
649 {
650         HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
651         Oid                     tupType;
652         int32           tupTypmod;
653         TupleDesc       tupdesc;
654         HeapTupleData tuple;
655         RecordIOData *my_extra;
656         int                     ncolumns;
657         int                     validcols;
658         int                     i;
659         Datum      *values;
660         bool       *nulls;
661         StringInfoData buf;
662
663         check_stack_depth();            /* recurses for record-type columns */
664
665         /* Extract type info from the tuple itself */
666         tupType = HeapTupleHeaderGetTypeId(rec);
667         tupTypmod = HeapTupleHeaderGetTypMod(rec);
668         tupdesc = lookup_rowtype_tupdesc(tupType, tupTypmod);
669         ncolumns = tupdesc->natts;
670
671         /* Build a temporary HeapTuple control structure */
672         tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
673         ItemPointerSetInvalid(&(tuple.t_self));
674         tuple.t_tableOid = InvalidOid;
675         tuple.t_data = rec;
676
677         /*
678          * We arrange to look up the needed I/O info just once per series of
679          * calls, assuming the record type doesn't change underneath us.
680          */
681         my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
682         if (my_extra == NULL ||
683                 my_extra->ncolumns != ncolumns)
684         {
685                 fcinfo->flinfo->fn_extra =
686                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
687                                                            offsetof(RecordIOData, columns) +
688                                                            ncolumns * sizeof(ColumnIOData));
689                 my_extra = (RecordIOData *) fcinfo->flinfo->fn_extra;
690                 my_extra->record_type = InvalidOid;
691                 my_extra->record_typmod = 0;
692         }
693
694         if (my_extra->record_type != tupType ||
695                 my_extra->record_typmod != tupTypmod)
696         {
697                 MemSet(my_extra, 0,
698                            offsetof(RecordIOData, columns) +
699                            ncolumns * sizeof(ColumnIOData));
700                 my_extra->record_type = tupType;
701                 my_extra->record_typmod = tupTypmod;
702                 my_extra->ncolumns = ncolumns;
703         }
704
705         values = (Datum *) palloc(ncolumns * sizeof(Datum));
706         nulls = (bool *) palloc(ncolumns * sizeof(bool));
707
708         /* Break down the tuple into fields */
709         heap_deform_tuple(&tuple, tupdesc, values, nulls);
710
711         /* And build the result string */
712         pq_begintypsend(&buf);
713
714         /* Need to scan to count nondeleted columns */
715         validcols = 0;
716         for (i = 0; i < ncolumns; i++)
717         {
718                 if (!TupleDescAttr(tupdesc, i)->attisdropped)
719                         validcols++;
720         }
721         pq_sendint32(&buf, validcols);
722
723         for (i = 0; i < ncolumns; i++)
724         {
725                 Form_pg_attribute att = TupleDescAttr(tupdesc, i);
726                 ColumnIOData *column_info = &my_extra->columns[i];
727                 Oid                     column_type = att->atttypid;
728                 Datum           attr;
729                 bytea      *outputbytes;
730
731                 /* Ignore dropped columns in datatype */
732                 if (att->attisdropped)
733                         continue;
734
735                 pq_sendint32(&buf, column_type);
736
737                 if (nulls[i])
738                 {
739                         /* emit -1 data length to signify a NULL */
740                         pq_sendint32(&buf, -1);
741                         continue;
742                 }
743
744                 /*
745                  * Convert the column value to binary
746                  */
747                 if (column_info->column_type != column_type)
748                 {
749                         getTypeBinaryOutputInfo(column_type,
750                                                                         &column_info->typiofunc,
751                                                                         &column_info->typisvarlena);
752                         fmgr_info_cxt(column_info->typiofunc, &column_info->proc,
753                                                   fcinfo->flinfo->fn_mcxt);
754                         column_info->column_type = column_type;
755                 }
756
757                 attr = values[i];
758                 outputbytes = SendFunctionCall(&column_info->proc, attr);
759                 pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
760                 pq_sendbytes(&buf, VARDATA(outputbytes),
761                                          VARSIZE(outputbytes) - VARHDRSZ);
762         }
763
764         pfree(values);
765         pfree(nulls);
766         ReleaseTupleDesc(tupdesc);
767
768         PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
769 }
770
771
772 /*
773  * record_cmp()
774  * Internal comparison function for records.
775  *
776  * Returns -1, 0 or 1
777  *
778  * Do not assume that the two inputs are exactly the same record type;
779  * for instance we might be comparing an anonymous ROW() construct against a
780  * named composite type.  We will compare as long as they have the same number
781  * of non-dropped columns of the same types.
782  */
783 static int
784 record_cmp(FunctionCallInfo fcinfo)
785 {
786         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
787         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
788         int                     result = 0;
789         Oid                     tupType1;
790         Oid                     tupType2;
791         int32           tupTypmod1;
792         int32           tupTypmod2;
793         TupleDesc       tupdesc1;
794         TupleDesc       tupdesc2;
795         HeapTupleData tuple1;
796         HeapTupleData tuple2;
797         int                     ncolumns1;
798         int                     ncolumns2;
799         RecordCompareData *my_extra;
800         int                     ncols;
801         Datum      *values1;
802         Datum      *values2;
803         bool       *nulls1;
804         bool       *nulls2;
805         int                     i1;
806         int                     i2;
807         int                     j;
808
809         check_stack_depth();            /* recurses for record-type columns */
810
811         /* Extract type info from the tuples */
812         tupType1 = HeapTupleHeaderGetTypeId(record1);
813         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
814         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
815         ncolumns1 = tupdesc1->natts;
816         tupType2 = HeapTupleHeaderGetTypeId(record2);
817         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
818         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
819         ncolumns2 = tupdesc2->natts;
820
821         /* Build temporary HeapTuple control structures */
822         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
823         ItemPointerSetInvalid(&(tuple1.t_self));
824         tuple1.t_tableOid = InvalidOid;
825         tuple1.t_data = record1;
826         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
827         ItemPointerSetInvalid(&(tuple2.t_self));
828         tuple2.t_tableOid = InvalidOid;
829         tuple2.t_data = record2;
830
831         /*
832          * We arrange to look up the needed comparison info just once per series
833          * of calls, assuming the record types don't change underneath us.
834          */
835         ncols = Max(ncolumns1, ncolumns2);
836         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
837         if (my_extra == NULL ||
838                 my_extra->ncolumns < ncols)
839         {
840                 fcinfo->flinfo->fn_extra =
841                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
842                                                            offsetof(RecordCompareData, columns) +
843                                                            ncols * sizeof(ColumnCompareData));
844                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
845                 my_extra->ncolumns = ncols;
846                 my_extra->record1_type = InvalidOid;
847                 my_extra->record1_typmod = 0;
848                 my_extra->record2_type = InvalidOid;
849                 my_extra->record2_typmod = 0;
850         }
851
852         if (my_extra->record1_type != tupType1 ||
853                 my_extra->record1_typmod != tupTypmod1 ||
854                 my_extra->record2_type != tupType2 ||
855                 my_extra->record2_typmod != tupTypmod2)
856         {
857                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
858                 my_extra->record1_type = tupType1;
859                 my_extra->record1_typmod = tupTypmod1;
860                 my_extra->record2_type = tupType2;
861                 my_extra->record2_typmod = tupTypmod2;
862         }
863
864         /* Break down the tuples into fields */
865         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
866         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
867         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
868         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
869         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
870         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
871
872         /*
873          * Scan corresponding columns, allowing for dropped columns in different
874          * places in the two rows.  i1 and i2 are physical column indexes, j is
875          * the logical column index.
876          */
877         i1 = i2 = j = 0;
878         while (i1 < ncolumns1 || i2 < ncolumns2)
879         {
880                 Form_pg_attribute att1;
881                 Form_pg_attribute att2;
882                 TypeCacheEntry *typentry;
883                 Oid                     collation;
884
885                 /*
886                  * Skip dropped columns
887                  */
888                 if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
889                 {
890                         i1++;
891                         continue;
892                 }
893                 if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
894                 {
895                         i2++;
896                         continue;
897                 }
898                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
899                         break;                          /* we'll deal with mismatch below loop */
900
901                 att1 = TupleDescAttr(tupdesc1, i1);
902                 att2 = TupleDescAttr(tupdesc2, i2);
903
904                 /*
905                  * Have two matching columns, they must be same type
906                  */
907                 if (att1->atttypid != att2->atttypid)
908                         ereport(ERROR,
909                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
910                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
911                                                         format_type_be(att1->atttypid),
912                                                         format_type_be(att2->atttypid),
913                                                         j + 1)));
914
915                 /*
916                  * If they're not same collation, we don't complain here, but the
917                  * comparison function might.
918                  */
919                 collation = att1->attcollation;
920                 if (collation != att2->attcollation)
921                         collation = InvalidOid;
922
923                 /*
924                  * Lookup the comparison function if not done already
925                  */
926                 typentry = my_extra->columns[j].typentry;
927                 if (typentry == NULL ||
928                         typentry->type_id != att1->atttypid)
929                 {
930                         typentry = lookup_type_cache(att1->atttypid,
931                                                                                  TYPECACHE_CMP_PROC_FINFO);
932                         if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
933                                 ereport(ERROR,
934                                                 (errcode(ERRCODE_UNDEFINED_FUNCTION),
935                                                  errmsg("could not identify a comparison function for type %s",
936                                                                 format_type_be(typentry->type_id))));
937                         my_extra->columns[j].typentry = typentry;
938                 }
939
940                 /*
941                  * We consider two NULLs equal; NULL > not-NULL.
942                  */
943                 if (!nulls1[i1] || !nulls2[i2])
944                 {
945                         FunctionCallInfoData locfcinfo;
946                         int32           cmpresult;
947
948                         if (nulls1[i1])
949                         {
950                                 /* arg1 is greater than arg2 */
951                                 result = 1;
952                                 break;
953                         }
954                         if (nulls2[i2])
955                         {
956                                 /* arg1 is less than arg2 */
957                                 result = -1;
958                                 break;
959                         }
960
961                         /* Compare the pair of elements */
962                         InitFunctionCallInfoData(locfcinfo, &typentry->cmp_proc_finfo, 2,
963                                                                          collation, NULL, NULL);
964                         locfcinfo.arg[0] = values1[i1];
965                         locfcinfo.arg[1] = values2[i2];
966                         locfcinfo.argnull[0] = false;
967                         locfcinfo.argnull[1] = false;
968                         locfcinfo.isnull = false;
969                         cmpresult = DatumGetInt32(FunctionCallInvoke(&locfcinfo));
970
971                         if (cmpresult < 0)
972                         {
973                                 /* arg1 is less than arg2 */
974                                 result = -1;
975                                 break;
976                         }
977                         else if (cmpresult > 0)
978                         {
979                                 /* arg1 is greater than arg2 */
980                                 result = 1;
981                                 break;
982                         }
983                 }
984
985                 /* equal, so continue to next column */
986                 i1++, i2++, j++;
987         }
988
989         /*
990          * If we didn't break out of the loop early, check for column count
991          * mismatch.  (We do not report such mismatch if we found unequal column
992          * values; is that a feature or a bug?)
993          */
994         if (result == 0)
995         {
996                 if (i1 != ncolumns1 || i2 != ncolumns2)
997                         ereport(ERROR,
998                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
999                                          errmsg("cannot compare record types with different numbers of columns")));
1000         }
1001
1002         pfree(values1);
1003         pfree(nulls1);
1004         pfree(values2);
1005         pfree(nulls2);
1006         ReleaseTupleDesc(tupdesc1);
1007         ReleaseTupleDesc(tupdesc2);
1008
1009         /* Avoid leaking memory when handed toasted input. */
1010         PG_FREE_IF_COPY(record1, 0);
1011         PG_FREE_IF_COPY(record2, 1);
1012
1013         return result;
1014 }
1015
1016 /*
1017  * record_eq :
1018  *                compares two records for equality
1019  * result :
1020  *                returns true if the records are equal, false otherwise.
1021  *
1022  * Note: we do not use record_cmp here, since equality may be meaningful in
1023  * datatypes that don't have a total ordering (and hence no btree support).
1024  */
1025 Datum
1026 record_eq(PG_FUNCTION_ARGS)
1027 {
1028         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1029         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1030         bool            result = true;
1031         Oid                     tupType1;
1032         Oid                     tupType2;
1033         int32           tupTypmod1;
1034         int32           tupTypmod2;
1035         TupleDesc       tupdesc1;
1036         TupleDesc       tupdesc2;
1037         HeapTupleData tuple1;
1038         HeapTupleData tuple2;
1039         int                     ncolumns1;
1040         int                     ncolumns2;
1041         RecordCompareData *my_extra;
1042         int                     ncols;
1043         Datum      *values1;
1044         Datum      *values2;
1045         bool       *nulls1;
1046         bool       *nulls2;
1047         int                     i1;
1048         int                     i2;
1049         int                     j;
1050
1051         check_stack_depth();            /* recurses for record-type columns */
1052
1053         /* Extract type info from the tuples */
1054         tupType1 = HeapTupleHeaderGetTypeId(record1);
1055         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1056         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1057         ncolumns1 = tupdesc1->natts;
1058         tupType2 = HeapTupleHeaderGetTypeId(record2);
1059         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1060         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1061         ncolumns2 = tupdesc2->natts;
1062
1063         /* Build temporary HeapTuple control structures */
1064         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1065         ItemPointerSetInvalid(&(tuple1.t_self));
1066         tuple1.t_tableOid = InvalidOid;
1067         tuple1.t_data = record1;
1068         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1069         ItemPointerSetInvalid(&(tuple2.t_self));
1070         tuple2.t_tableOid = InvalidOid;
1071         tuple2.t_data = record2;
1072
1073         /*
1074          * We arrange to look up the needed comparison info just once per series
1075          * of calls, assuming the record types don't change underneath us.
1076          */
1077         ncols = Max(ncolumns1, ncolumns2);
1078         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1079         if (my_extra == NULL ||
1080                 my_extra->ncolumns < ncols)
1081         {
1082                 fcinfo->flinfo->fn_extra =
1083                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1084                                                            offsetof(RecordCompareData, columns) +
1085                                                            ncols * sizeof(ColumnCompareData));
1086                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1087                 my_extra->ncolumns = ncols;
1088                 my_extra->record1_type = InvalidOid;
1089                 my_extra->record1_typmod = 0;
1090                 my_extra->record2_type = InvalidOid;
1091                 my_extra->record2_typmod = 0;
1092         }
1093
1094         if (my_extra->record1_type != tupType1 ||
1095                 my_extra->record1_typmod != tupTypmod1 ||
1096                 my_extra->record2_type != tupType2 ||
1097                 my_extra->record2_typmod != tupTypmod2)
1098         {
1099                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1100                 my_extra->record1_type = tupType1;
1101                 my_extra->record1_typmod = tupTypmod1;
1102                 my_extra->record2_type = tupType2;
1103                 my_extra->record2_typmod = tupTypmod2;
1104         }
1105
1106         /* Break down the tuples into fields */
1107         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1108         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1109         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1110         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1111         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1112         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1113
1114         /*
1115          * Scan corresponding columns, allowing for dropped columns in different
1116          * places in the two rows.  i1 and i2 are physical column indexes, j is
1117          * the logical column index.
1118          */
1119         i1 = i2 = j = 0;
1120         while (i1 < ncolumns1 || i2 < ncolumns2)
1121         {
1122                 Form_pg_attribute att1;
1123                 Form_pg_attribute att2;
1124                 TypeCacheEntry *typentry;
1125                 Oid                     collation;
1126                 FunctionCallInfoData locfcinfo;
1127                 bool            oprresult;
1128
1129                 /*
1130                  * Skip dropped columns
1131                  */
1132                 if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1133                 {
1134                         i1++;
1135                         continue;
1136                 }
1137                 if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1138                 {
1139                         i2++;
1140                         continue;
1141                 }
1142                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1143                         break;                          /* we'll deal with mismatch below loop */
1144
1145                 att1 = TupleDescAttr(tupdesc1, i1);
1146                 att2 = TupleDescAttr(tupdesc2, i2);
1147
1148                 /*
1149                  * Have two matching columns, they must be same type
1150                  */
1151                 if (att1->atttypid != att2->atttypid)
1152                         ereport(ERROR,
1153                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1154                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1155                                                         format_type_be(att1->atttypid),
1156                                                         format_type_be(att2->atttypid),
1157                                                         j + 1)));
1158
1159                 /*
1160                  * If they're not same collation, we don't complain here, but the
1161                  * equality function might.
1162                  */
1163                 collation = att1->attcollation;
1164                 if (collation != att2->attcollation)
1165                         collation = InvalidOid;
1166
1167                 /*
1168                  * Lookup the equality function if not done already
1169                  */
1170                 typentry = my_extra->columns[j].typentry;
1171                 if (typentry == NULL ||
1172                         typentry->type_id != att1->atttypid)
1173                 {
1174                         typentry = lookup_type_cache(att1->atttypid,
1175                                                                                  TYPECACHE_EQ_OPR_FINFO);
1176                         if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
1177                                 ereport(ERROR,
1178                                                 (errcode(ERRCODE_UNDEFINED_FUNCTION),
1179                                                  errmsg("could not identify an equality operator for type %s",
1180                                                                 format_type_be(typentry->type_id))));
1181                         my_extra->columns[j].typentry = typentry;
1182                 }
1183
1184                 /*
1185                  * We consider two NULLs equal; NULL > not-NULL.
1186                  */
1187                 if (!nulls1[i1] || !nulls2[i2])
1188                 {
1189                         if (nulls1[i1] || nulls2[i2])
1190                         {
1191                                 result = false;
1192                                 break;
1193                         }
1194
1195                         /* Compare the pair of elements */
1196                         InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2,
1197                                                                          collation, NULL, NULL);
1198                         locfcinfo.arg[0] = values1[i1];
1199                         locfcinfo.arg[1] = values2[i2];
1200                         locfcinfo.argnull[0] = false;
1201                         locfcinfo.argnull[1] = false;
1202                         locfcinfo.isnull = false;
1203                         oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo));
1204                         if (!oprresult)
1205                         {
1206                                 result = false;
1207                                 break;
1208                         }
1209                 }
1210
1211                 /* equal, so continue to next column */
1212                 i1++, i2++, j++;
1213         }
1214
1215         /*
1216          * If we didn't break out of the loop early, check for column count
1217          * mismatch.  (We do not report such mismatch if we found unequal column
1218          * values; is that a feature or a bug?)
1219          */
1220         if (result)
1221         {
1222                 if (i1 != ncolumns1 || i2 != ncolumns2)
1223                         ereport(ERROR,
1224                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1225                                          errmsg("cannot compare record types with different numbers of columns")));
1226         }
1227
1228         pfree(values1);
1229         pfree(nulls1);
1230         pfree(values2);
1231         pfree(nulls2);
1232         ReleaseTupleDesc(tupdesc1);
1233         ReleaseTupleDesc(tupdesc2);
1234
1235         /* Avoid leaking memory when handed toasted input. */
1236         PG_FREE_IF_COPY(record1, 0);
1237         PG_FREE_IF_COPY(record2, 1);
1238
1239         PG_RETURN_BOOL(result);
1240 }
1241
1242 Datum
1243 record_ne(PG_FUNCTION_ARGS)
1244 {
1245         PG_RETURN_BOOL(!DatumGetBool(record_eq(fcinfo)));
1246 }
1247
1248 Datum
1249 record_lt(PG_FUNCTION_ARGS)
1250 {
1251         PG_RETURN_BOOL(record_cmp(fcinfo) < 0);
1252 }
1253
1254 Datum
1255 record_gt(PG_FUNCTION_ARGS)
1256 {
1257         PG_RETURN_BOOL(record_cmp(fcinfo) > 0);
1258 }
1259
1260 Datum
1261 record_le(PG_FUNCTION_ARGS)
1262 {
1263         PG_RETURN_BOOL(record_cmp(fcinfo) <= 0);
1264 }
1265
1266 Datum
1267 record_ge(PG_FUNCTION_ARGS)
1268 {
1269         PG_RETURN_BOOL(record_cmp(fcinfo) >= 0);
1270 }
1271
1272 Datum
1273 btrecordcmp(PG_FUNCTION_ARGS)
1274 {
1275         PG_RETURN_INT32(record_cmp(fcinfo));
1276 }
1277
1278
1279 /*
1280  * record_image_cmp :
1281  * Internal byte-oriented comparison function for records.
1282  *
1283  * Returns -1, 0 or 1
1284  *
1285  * Note: The normal concepts of "equality" do not apply here; different
1286  * representation of values considered to be equal are not considered to be
1287  * identical.  As an example, for the citext type 'A' and 'a' are equal, but
1288  * they are not identical.
1289  */
1290 static int
1291 record_image_cmp(FunctionCallInfo fcinfo)
1292 {
1293         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1294         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1295         int                     result = 0;
1296         Oid                     tupType1;
1297         Oid                     tupType2;
1298         int32           tupTypmod1;
1299         int32           tupTypmod2;
1300         TupleDesc       tupdesc1;
1301         TupleDesc       tupdesc2;
1302         HeapTupleData tuple1;
1303         HeapTupleData tuple2;
1304         int                     ncolumns1;
1305         int                     ncolumns2;
1306         RecordCompareData *my_extra;
1307         int                     ncols;
1308         Datum      *values1;
1309         Datum      *values2;
1310         bool       *nulls1;
1311         bool       *nulls2;
1312         int                     i1;
1313         int                     i2;
1314         int                     j;
1315
1316         /* Extract type info from the tuples */
1317         tupType1 = HeapTupleHeaderGetTypeId(record1);
1318         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1319         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1320         ncolumns1 = tupdesc1->natts;
1321         tupType2 = HeapTupleHeaderGetTypeId(record2);
1322         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1323         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1324         ncolumns2 = tupdesc2->natts;
1325
1326         /* Build temporary HeapTuple control structures */
1327         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1328         ItemPointerSetInvalid(&(tuple1.t_self));
1329         tuple1.t_tableOid = InvalidOid;
1330         tuple1.t_data = record1;
1331         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1332         ItemPointerSetInvalid(&(tuple2.t_self));
1333         tuple2.t_tableOid = InvalidOid;
1334         tuple2.t_data = record2;
1335
1336         /*
1337          * We arrange to look up the needed comparison info just once per series
1338          * of calls, assuming the record types don't change underneath us.
1339          */
1340         ncols = Max(ncolumns1, ncolumns2);
1341         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1342         if (my_extra == NULL ||
1343                 my_extra->ncolumns < ncols)
1344         {
1345                 fcinfo->flinfo->fn_extra =
1346                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1347                                                            offsetof(RecordCompareData, columns) +
1348                                                            ncols * sizeof(ColumnCompareData));
1349                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1350                 my_extra->ncolumns = ncols;
1351                 my_extra->record1_type = InvalidOid;
1352                 my_extra->record1_typmod = 0;
1353                 my_extra->record2_type = InvalidOid;
1354                 my_extra->record2_typmod = 0;
1355         }
1356
1357         if (my_extra->record1_type != tupType1 ||
1358                 my_extra->record1_typmod != tupTypmod1 ||
1359                 my_extra->record2_type != tupType2 ||
1360                 my_extra->record2_typmod != tupTypmod2)
1361         {
1362                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1363                 my_extra->record1_type = tupType1;
1364                 my_extra->record1_typmod = tupTypmod1;
1365                 my_extra->record2_type = tupType2;
1366                 my_extra->record2_typmod = tupTypmod2;
1367         }
1368
1369         /* Break down the tuples into fields */
1370         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1371         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1372         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1373         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1374         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1375         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1376
1377         /*
1378          * Scan corresponding columns, allowing for dropped columns in different
1379          * places in the two rows.  i1 and i2 are physical column indexes, j is
1380          * the logical column index.
1381          */
1382         i1 = i2 = j = 0;
1383         while (i1 < ncolumns1 || i2 < ncolumns2)
1384         {
1385                 Form_pg_attribute att1;
1386                 Form_pg_attribute att2;
1387
1388                 /*
1389                  * Skip dropped columns
1390                  */
1391                 if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1392                 {
1393                         i1++;
1394                         continue;
1395                 }
1396                 if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1397                 {
1398                         i2++;
1399                         continue;
1400                 }
1401                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1402                         break;                          /* we'll deal with mismatch below loop */
1403
1404                 att1 = TupleDescAttr(tupdesc1, i1);
1405                 att2 = TupleDescAttr(tupdesc2, i2);
1406
1407                 /*
1408                  * Have two matching columns, they must be same type
1409                  */
1410                 if (att1->atttypid != att2->atttypid)
1411                         ereport(ERROR,
1412                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1413                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1414                                                         format_type_be(att1->atttypid),
1415                                                         format_type_be(att2->atttypid),
1416                                                         j + 1)));
1417
1418                 /*
1419                  * The same type should have the same length (or both should be
1420                  * variable).
1421                  */
1422                 Assert(att1->attlen == att2->attlen);
1423
1424                 /*
1425                  * We consider two NULLs equal; NULL > not-NULL.
1426                  */
1427                 if (!nulls1[i1] || !nulls2[i2])
1428                 {
1429                         int                     cmpresult = 0;
1430
1431                         if (nulls1[i1])
1432                         {
1433                                 /* arg1 is greater than arg2 */
1434                                 result = 1;
1435                                 break;
1436                         }
1437                         if (nulls2[i2])
1438                         {
1439                                 /* arg1 is less than arg2 */
1440                                 result = -1;
1441                                 break;
1442                         }
1443
1444                         /* Compare the pair of elements */
1445                         if (att1->attlen == -1)
1446                         {
1447                                 Size            len1,
1448                                                         len2;
1449                                 struct varlena *arg1val;
1450                                 struct varlena *arg2val;
1451
1452                                 len1 = toast_raw_datum_size(values1[i1]);
1453                                 len2 = toast_raw_datum_size(values2[i2]);
1454                                 arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1455                                 arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1456
1457                                 cmpresult = memcmp(VARDATA_ANY(arg1val),
1458                                                                    VARDATA_ANY(arg2val),
1459                                                                    Min(len1, len2) - VARHDRSZ);
1460                                 if ((cmpresult == 0) && (len1 != len2))
1461                                         cmpresult = (len1 < len2) ? -1 : 1;
1462
1463                                 if ((Pointer) arg1val != (Pointer) values1[i1])
1464                                         pfree(arg1val);
1465                                 if ((Pointer) arg2val != (Pointer) values2[i2])
1466                                         pfree(arg2val);
1467                         }
1468                         else if (att1->attbyval)
1469                         {
1470                                 switch (att1->attlen)
1471                                 {
1472                                         case 1:
1473                                                 if (GET_1_BYTE(values1[i1]) !=
1474                                                         GET_1_BYTE(values2[i2]))
1475                                                 {
1476                                                         cmpresult = (GET_1_BYTE(values1[i1]) <
1477                                                                                  GET_1_BYTE(values2[i2])) ? -1 : 1;
1478                                                 }
1479                                                 break;
1480                                         case 2:
1481                                                 if (GET_2_BYTES(values1[i1]) !=
1482                                                         GET_2_BYTES(values2[i2]))
1483                                                 {
1484                                                         cmpresult = (GET_2_BYTES(values1[i1]) <
1485                                                                                  GET_2_BYTES(values2[i2])) ? -1 : 1;
1486                                                 }
1487                                                 break;
1488                                         case 4:
1489                                                 if (GET_4_BYTES(values1[i1]) !=
1490                                                         GET_4_BYTES(values2[i2]))
1491                                                 {
1492                                                         cmpresult = (GET_4_BYTES(values1[i1]) <
1493                                                                                  GET_4_BYTES(values2[i2])) ? -1 : 1;
1494                                                 }
1495                                                 break;
1496 #if SIZEOF_DATUM == 8
1497                                         case 8:
1498                                                 if (GET_8_BYTES(values1[i1]) !=
1499                                                         GET_8_BYTES(values2[i2]))
1500                                                 {
1501                                                         cmpresult = (GET_8_BYTES(values1[i1]) <
1502                                                                                  GET_8_BYTES(values2[i2])) ? -1 : 1;
1503                                                 }
1504                                                 break;
1505 #endif
1506                                         default:
1507                                                 Assert(false);  /* cannot happen */
1508                                 }
1509                         }
1510                         else
1511                         {
1512                                 cmpresult = memcmp(DatumGetPointer(values1[i1]),
1513                                                                    DatumGetPointer(values2[i2]),
1514                                                                    att1->attlen);
1515                         }
1516
1517                         if (cmpresult < 0)
1518                         {
1519                                 /* arg1 is less than arg2 */
1520                                 result = -1;
1521                                 break;
1522                         }
1523                         else if (cmpresult > 0)
1524                         {
1525                                 /* arg1 is greater than arg2 */
1526                                 result = 1;
1527                                 break;
1528                         }
1529                 }
1530
1531                 /* equal, so continue to next column */
1532                 i1++, i2++, j++;
1533         }
1534
1535         /*
1536          * If we didn't break out of the loop early, check for column count
1537          * mismatch.  (We do not report such mismatch if we found unequal column
1538          * values; is that a feature or a bug?)
1539          */
1540         if (result == 0)
1541         {
1542                 if (i1 != ncolumns1 || i2 != ncolumns2)
1543                         ereport(ERROR,
1544                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1545                                          errmsg("cannot compare record types with different numbers of columns")));
1546         }
1547
1548         pfree(values1);
1549         pfree(nulls1);
1550         pfree(values2);
1551         pfree(nulls2);
1552         ReleaseTupleDesc(tupdesc1);
1553         ReleaseTupleDesc(tupdesc2);
1554
1555         /* Avoid leaking memory when handed toasted input. */
1556         PG_FREE_IF_COPY(record1, 0);
1557         PG_FREE_IF_COPY(record2, 1);
1558
1559         return result;
1560 }
1561
1562 /*
1563  * record_image_eq :
1564  *                compares two records for identical contents, based on byte images
1565  * result :
1566  *                returns true if the records are identical, false otherwise.
1567  *
1568  * Note: we do not use record_image_cmp here, since we can avoid
1569  * de-toasting for unequal lengths this way.
1570  */
1571 Datum
1572 record_image_eq(PG_FUNCTION_ARGS)
1573 {
1574         HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0);
1575         HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1);
1576         bool            result = true;
1577         Oid                     tupType1;
1578         Oid                     tupType2;
1579         int32           tupTypmod1;
1580         int32           tupTypmod2;
1581         TupleDesc       tupdesc1;
1582         TupleDesc       tupdesc2;
1583         HeapTupleData tuple1;
1584         HeapTupleData tuple2;
1585         int                     ncolumns1;
1586         int                     ncolumns2;
1587         RecordCompareData *my_extra;
1588         int                     ncols;
1589         Datum      *values1;
1590         Datum      *values2;
1591         bool       *nulls1;
1592         bool       *nulls2;
1593         int                     i1;
1594         int                     i2;
1595         int                     j;
1596
1597         /* Extract type info from the tuples */
1598         tupType1 = HeapTupleHeaderGetTypeId(record1);
1599         tupTypmod1 = HeapTupleHeaderGetTypMod(record1);
1600         tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1);
1601         ncolumns1 = tupdesc1->natts;
1602         tupType2 = HeapTupleHeaderGetTypeId(record2);
1603         tupTypmod2 = HeapTupleHeaderGetTypMod(record2);
1604         tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2);
1605         ncolumns2 = tupdesc2->natts;
1606
1607         /* Build temporary HeapTuple control structures */
1608         tuple1.t_len = HeapTupleHeaderGetDatumLength(record1);
1609         ItemPointerSetInvalid(&(tuple1.t_self));
1610         tuple1.t_tableOid = InvalidOid;
1611         tuple1.t_data = record1;
1612         tuple2.t_len = HeapTupleHeaderGetDatumLength(record2);
1613         ItemPointerSetInvalid(&(tuple2.t_self));
1614         tuple2.t_tableOid = InvalidOid;
1615         tuple2.t_data = record2;
1616
1617         /*
1618          * We arrange to look up the needed comparison info just once per series
1619          * of calls, assuming the record types don't change underneath us.
1620          */
1621         ncols = Max(ncolumns1, ncolumns2);
1622         my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1623         if (my_extra == NULL ||
1624                 my_extra->ncolumns < ncols)
1625         {
1626                 fcinfo->flinfo->fn_extra =
1627                         MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
1628                                                            offsetof(RecordCompareData, columns) +
1629                                                            ncols * sizeof(ColumnCompareData));
1630                 my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra;
1631                 my_extra->ncolumns = ncols;
1632                 my_extra->record1_type = InvalidOid;
1633                 my_extra->record1_typmod = 0;
1634                 my_extra->record2_type = InvalidOid;
1635                 my_extra->record2_typmod = 0;
1636         }
1637
1638         if (my_extra->record1_type != tupType1 ||
1639                 my_extra->record1_typmod != tupTypmod1 ||
1640                 my_extra->record2_type != tupType2 ||
1641                 my_extra->record2_typmod != tupTypmod2)
1642         {
1643                 MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData));
1644                 my_extra->record1_type = tupType1;
1645                 my_extra->record1_typmod = tupTypmod1;
1646                 my_extra->record2_type = tupType2;
1647                 my_extra->record2_typmod = tupTypmod2;
1648         }
1649
1650         /* Break down the tuples into fields */
1651         values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum));
1652         nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool));
1653         heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1);
1654         values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum));
1655         nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool));
1656         heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2);
1657
1658         /*
1659          * Scan corresponding columns, allowing for dropped columns in different
1660          * places in the two rows.  i1 and i2 are physical column indexes, j is
1661          * the logical column index.
1662          */
1663         i1 = i2 = j = 0;
1664         while (i1 < ncolumns1 || i2 < ncolumns2)
1665         {
1666                 Form_pg_attribute att1;
1667                 Form_pg_attribute att2;
1668
1669                 /*
1670                  * Skip dropped columns
1671                  */
1672                 if (i1 < ncolumns1 && TupleDescAttr(tupdesc1, i1)->attisdropped)
1673                 {
1674                         i1++;
1675                         continue;
1676                 }
1677                 if (i2 < ncolumns2 && TupleDescAttr(tupdesc2, i2)->attisdropped)
1678                 {
1679                         i2++;
1680                         continue;
1681                 }
1682                 if (i1 >= ncolumns1 || i2 >= ncolumns2)
1683                         break;                          /* we'll deal with mismatch below loop */
1684
1685                 att1 = TupleDescAttr(tupdesc1, i1);
1686                 att2 = TupleDescAttr(tupdesc2, i2);
1687
1688                 /*
1689                  * Have two matching columns, they must be same type
1690                  */
1691                 if (att1->atttypid != att2->atttypid)
1692                         ereport(ERROR,
1693                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1694                                          errmsg("cannot compare dissimilar column types %s and %s at record column %d",
1695                                                         format_type_be(att1->atttypid),
1696                                                         format_type_be(att2->atttypid),
1697                                                         j + 1)));
1698
1699                 /*
1700                  * We consider two NULLs equal; NULL > not-NULL.
1701                  */
1702                 if (!nulls1[i1] || !nulls2[i2])
1703                 {
1704                         if (nulls1[i1] || nulls2[i2])
1705                         {
1706                                 result = false;
1707                                 break;
1708                         }
1709
1710                         /* Compare the pair of elements */
1711                         if (att1->attlen == -1)
1712                         {
1713                                 Size            len1,
1714                                                         len2;
1715
1716                                 len1 = toast_raw_datum_size(values1[i1]);
1717                                 len2 = toast_raw_datum_size(values2[i2]);
1718                                 /* No need to de-toast if lengths don't match. */
1719                                 if (len1 != len2)
1720                                         result = false;
1721                                 else
1722                                 {
1723                                         struct varlena *arg1val;
1724                                         struct varlena *arg2val;
1725
1726                                         arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]);
1727                                         arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]);
1728
1729                                         result = (memcmp(VARDATA_ANY(arg1val),
1730                                                                          VARDATA_ANY(arg2val),
1731                                                                          len1 - VARHDRSZ) == 0);
1732
1733                                         /* Only free memory if it's a copy made here. */
1734                                         if ((Pointer) arg1val != (Pointer) values1[i1])
1735                                                 pfree(arg1val);
1736                                         if ((Pointer) arg2val != (Pointer) values2[i2])
1737                                                 pfree(arg2val);
1738                                 }
1739                         }
1740                         else if (att1->attbyval)
1741                         {
1742                                 switch (att1->attlen)
1743                                 {
1744                                         case 1:
1745                                                 result = (GET_1_BYTE(values1[i1]) ==
1746                                                                   GET_1_BYTE(values2[i2]));
1747                                                 break;
1748                                         case 2:
1749                                                 result = (GET_2_BYTES(values1[i1]) ==
1750                                                                   GET_2_BYTES(values2[i2]));
1751                                                 break;
1752                                         case 4:
1753                                                 result = (GET_4_BYTES(values1[i1]) ==
1754                                                                   GET_4_BYTES(values2[i2]));
1755                                                 break;
1756 #if SIZEOF_DATUM == 8
1757                                         case 8:
1758                                                 result = (GET_8_BYTES(values1[i1]) ==
1759                                                                   GET_8_BYTES(values2[i2]));
1760                                                 break;
1761 #endif
1762                                         default:
1763                                                 Assert(false);  /* cannot happen */
1764                                 }
1765                         }
1766                         else
1767                         {
1768                                 result = (memcmp(DatumGetPointer(values1[i1]),
1769                                                                  DatumGetPointer(values2[i2]),
1770                                                                  att1->attlen) == 0);
1771                         }
1772                         if (!result)
1773                                 break;
1774                 }
1775
1776                 /* equal, so continue to next column */
1777                 i1++, i2++, j++;
1778         }
1779
1780         /*
1781          * If we didn't break out of the loop early, check for column count
1782          * mismatch.  (We do not report such mismatch if we found unequal column
1783          * values; is that a feature or a bug?)
1784          */
1785         if (result)
1786         {
1787                 if (i1 != ncolumns1 || i2 != ncolumns2)
1788                         ereport(ERROR,
1789                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
1790                                          errmsg("cannot compare record types with different numbers of columns")));
1791         }
1792
1793         pfree(values1);
1794         pfree(nulls1);
1795         pfree(values2);
1796         pfree(nulls2);
1797         ReleaseTupleDesc(tupdesc1);
1798         ReleaseTupleDesc(tupdesc2);
1799
1800         /* Avoid leaking memory when handed toasted input. */
1801         PG_FREE_IF_COPY(record1, 0);
1802         PG_FREE_IF_COPY(record2, 1);
1803
1804         PG_RETURN_BOOL(result);
1805 }
1806
1807 Datum
1808 record_image_ne(PG_FUNCTION_ARGS)
1809 {
1810         PG_RETURN_BOOL(!DatumGetBool(record_image_eq(fcinfo)));
1811 }
1812
1813 Datum
1814 record_image_lt(PG_FUNCTION_ARGS)
1815 {
1816         PG_RETURN_BOOL(record_image_cmp(fcinfo) < 0);
1817 }
1818
1819 Datum
1820 record_image_gt(PG_FUNCTION_ARGS)
1821 {
1822         PG_RETURN_BOOL(record_image_cmp(fcinfo) > 0);
1823 }
1824
1825 Datum
1826 record_image_le(PG_FUNCTION_ARGS)
1827 {
1828         PG_RETURN_BOOL(record_image_cmp(fcinfo) <= 0);
1829 }
1830
1831 Datum
1832 record_image_ge(PG_FUNCTION_ARGS)
1833 {
1834         PG_RETURN_BOOL(record_image_cmp(fcinfo) >= 0);
1835 }
1836
1837 Datum
1838 btrecordimagecmp(PG_FUNCTION_ARGS)
1839 {
1840         PG_RETURN_INT32(record_image_cmp(fcinfo));
1841 }