]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Change the relation_open protocol so that we obtain lock on a relation
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2006, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.63 2006/07/31 20:08:59 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "catalog/catalog.h"
37 #include "utils/fmgroids.h"
38 #include "utils/pg_lzcompress.h"
39 #include "utils/typcache.h"
40
41
42 #undef TOAST_DEBUG
43
44 static void toast_delete_datum(Relation rel, Datum value);
45 static Datum toast_save_datum(Relation rel, Datum value);
46 static varattrib *toast_fetch_datum(varattrib *attr);
47 static varattrib *toast_fetch_datum_slice(varattrib *attr,
48                                                 int32 sliceoffset, int32 length);
49
50
51 /* ----------
52  * heap_tuple_fetch_attr -
53  *
54  *      Public entry point to get back a toasted value
55  *      external storage (possibly still in compressed format).
56  * ----------
57  */
58 varattrib *
59 heap_tuple_fetch_attr(varattrib *attr)
60 {
61         varattrib  *result;
62
63         if (VARATT_IS_EXTERNAL(attr))
64         {
65                 /*
66                  * This is an external stored plain value
67                  */
68                 result = toast_fetch_datum(attr);
69         }
70         else
71         {
72                 /*
73                  * This is a plain value inside of the main tuple - why am I called?
74                  */
75                 result = attr;
76         }
77
78         return result;
79 }
80
81
82 /* ----------
83  * heap_tuple_untoast_attr -
84  *
85  *      Public entry point to get back a toasted value from compression
86  *      or external storage.
87  * ----------
88  */
89 varattrib *
90 heap_tuple_untoast_attr(varattrib *attr)
91 {
92         varattrib  *result;
93
94         if (VARATT_IS_EXTERNAL(attr))
95         {
96                 if (VARATT_IS_COMPRESSED(attr))
97                 {
98                         /* ----------
99                          * This is an external stored compressed value
100                          * Fetch it from the toast heap and decompress.
101                          * ----------
102                          */
103                         varattrib  *tmp;
104
105                         tmp = toast_fetch_datum(attr);
106                         result = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
107                                                                                   + VARHDRSZ);
108                         VARATT_SIZEP(result) = attr->va_content.va_external.va_rawsize
109                                 + VARHDRSZ;
110                         pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(result));
111
112                         pfree(tmp);
113                 }
114                 else
115                 {
116                         /*
117                          * This is an external stored plain value
118                          */
119                         result = toast_fetch_datum(attr);
120                 }
121         }
122         else if (VARATT_IS_COMPRESSED(attr))
123         {
124                 /*
125                  * This is a compressed value inside of the main tuple
126                  */
127                 result = (varattrib *) palloc(attr->va_content.va_compressed.va_rawsize
128                                                                           + VARHDRSZ);
129                 VARATT_SIZEP(result) = attr->va_content.va_compressed.va_rawsize
130                         + VARHDRSZ;
131                 pglz_decompress((PGLZ_Header *) attr, VARATT_DATA(result));
132         }
133         else
134
135                 /*
136                  * This is a plain value inside of the main tuple - why am I called?
137                  */
138                 return attr;
139
140         return result;
141 }
142
143
144 /* ----------
145  * heap_tuple_untoast_attr_slice -
146  *
147  *              Public entry point to get back part of a toasted value
148  *              from compression or external storage.
149  * ----------
150  */
151 varattrib *
152 heap_tuple_untoast_attr_slice(varattrib *attr, int32 sliceoffset, int32 slicelength)
153 {
154         varattrib  *preslice;
155         varattrib  *result;
156         int32           attrsize;
157
158         if (VARATT_IS_COMPRESSED(attr))
159         {
160                 varattrib  *tmp;
161
162                 if (VARATT_IS_EXTERNAL(attr))
163                         tmp = toast_fetch_datum(attr);
164                 else
165                 {
166                         tmp = attr;                     /* compressed in main tuple */
167                 }
168
169                 preslice = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
170                                                                                 + VARHDRSZ);
171                 VARATT_SIZEP(preslice) = attr->va_content.va_external.va_rawsize + VARHDRSZ;
172                 pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(preslice));
173
174                 if (tmp != attr)
175                         pfree(tmp);
176         }
177         else
178         {
179                 /* Plain value */
180                 if (VARATT_IS_EXTERNAL(attr))
181                 {
182                         /* fast path */
183                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
184                 }
185                 else
186                         preslice = attr;
187         }
188
189         /* slicing of datum for compressed cases and plain value */
190
191         attrsize = VARSIZE(preslice) - VARHDRSZ;
192         if (sliceoffset >= attrsize)
193         {
194                 sliceoffset = 0;
195                 slicelength = 0;
196         }
197
198         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
199                 slicelength = attrsize - sliceoffset;
200
201         result = (varattrib *) palloc(slicelength + VARHDRSZ);
202         VARATT_SIZEP(result) = slicelength + VARHDRSZ;
203
204         memcpy(VARDATA(result), VARDATA(preslice) + sliceoffset, slicelength);
205
206         if (preslice != attr)
207                 pfree(preslice);
208
209         return result;
210 }
211
212
213 /* ----------
214  * toast_raw_datum_size -
215  *
216  *      Return the raw (detoasted) size of a varlena datum
217  * ----------
218  */
219 Size
220 toast_raw_datum_size(Datum value)
221 {
222         varattrib  *attr = (varattrib *) DatumGetPointer(value);
223         Size            result;
224
225         if (VARATT_IS_COMPRESSED(attr))
226         {
227                 /*
228                  * va_rawsize shows the original data size, whether the datum is
229                  * external or not.
230                  */
231                 result = attr->va_content.va_compressed.va_rawsize + VARHDRSZ;
232         }
233         else if (VARATT_IS_EXTERNAL(attr))
234         {
235                 /*
236                  * an uncompressed external attribute has rawsize including the header
237                  * (not too consistent!)
238                  */
239                 result = attr->va_content.va_external.va_rawsize;
240         }
241         else
242         {
243                 /* plain untoasted datum */
244                 result = VARSIZE(attr);
245         }
246         return result;
247 }
248
249 /* ----------
250  * toast_datum_size
251  *
252  *      Return the physical storage size (possibly compressed) of a varlena datum
253  * ----------
254  */
255 Size
256 toast_datum_size(Datum value)
257 {
258         varattrib  *attr = (varattrib *) DatumGetPointer(value);
259         Size            result;
260
261         if (VARATT_IS_EXTERNAL(attr))
262         {
263                 /*
264                  * Attribute is stored externally - return the extsize whether
265                  * compressed or not.  We do not count the size of the toast pointer
266                  * ... should we?
267                  */
268                 result = attr->va_content.va_external.va_extsize;
269         }
270         else
271         {
272                 /*
273                  * Attribute is stored inline either compressed or not, just calculate
274                  * the size of the datum in either case.
275                  */
276                 result = VARSIZE(attr);
277         }
278         return result;
279 }
280
281
282 /* ----------
283  * toast_delete -
284  *
285  *      Cascaded delete toast-entries on DELETE
286  * ----------
287  */
288 void
289 toast_delete(Relation rel, HeapTuple oldtup)
290 {
291         TupleDesc       tupleDesc;
292         Form_pg_attribute *att;
293         int                     numAttrs;
294         int                     i;
295         Datum           toast_values[MaxHeapAttributeNumber];
296         bool            toast_isnull[MaxHeapAttributeNumber];
297
298         /*
299          * Get the tuple descriptor and break down the tuple into fields.
300          *
301          * NOTE: it's debatable whether to use heap_deformtuple() here or just
302          * heap_getattr() only the varlena columns.  The latter could win if there
303          * are few varlena columns and many non-varlena ones. However,
304          * heap_deformtuple costs only O(N) while the heap_getattr way would cost
305          * O(N^2) if there are many varlena columns, so it seems better to err on
306          * the side of linear cost.  (We won't even be here unless there's at
307          * least one varlena column, by the way.)
308          */
309         tupleDesc = rel->rd_att;
310         att = tupleDesc->attrs;
311         numAttrs = tupleDesc->natts;
312
313         Assert(numAttrs <= MaxHeapAttributeNumber);
314         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
315
316         /*
317          * Check for external stored attributes and delete them from the secondary
318          * relation.
319          */
320         for (i = 0; i < numAttrs; i++)
321         {
322                 if (att[i]->attlen == -1)
323                 {
324                         Datum           value = toast_values[i];
325
326                         if (!toast_isnull[i] && VARATT_IS_EXTERNAL(value))
327                                 toast_delete_datum(rel, value);
328                 }
329         }
330 }
331
332
333 /* ----------
334  * toast_insert_or_update -
335  *
336  *      Delete no-longer-used toast-entries and create new ones to
337  *      make the new tuple fit on INSERT or UPDATE
338  *
339  * Inputs:
340  *      newtup: the candidate new tuple to be inserted
341  *      oldtup: the old row version for UPDATE, or NULL for INSERT
342  * Result:
343  *      either newtup if no toasting is needed, or a palloc'd modified tuple
344  *      that is what should actually get stored
345  *
346  * NOTE: neither newtup nor oldtup will be modified.  This is a change
347  * from the pre-8.1 API of this routine.
348  * ----------
349  */
350 HeapTuple
351 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup)
352 {
353         HeapTuple       result_tuple;
354         TupleDesc       tupleDesc;
355         Form_pg_attribute *att;
356         int                     numAttrs;
357         int                     i;
358
359         bool            need_change = false;
360         bool            need_free = false;
361         bool            need_delold = false;
362         bool            has_nulls = false;
363
364         Size            maxDataLen;
365
366         char            toast_action[MaxHeapAttributeNumber];
367         bool            toast_isnull[MaxHeapAttributeNumber];
368         bool            toast_oldisnull[MaxHeapAttributeNumber];
369         Datum           toast_values[MaxHeapAttributeNumber];
370         Datum           toast_oldvalues[MaxHeapAttributeNumber];
371         int32           toast_sizes[MaxHeapAttributeNumber];
372         bool            toast_free[MaxHeapAttributeNumber];
373         bool            toast_delold[MaxHeapAttributeNumber];
374
375         /*
376          * Get the tuple descriptor and break down the tuple(s) into fields.
377          */
378         tupleDesc = rel->rd_att;
379         att = tupleDesc->attrs;
380         numAttrs = tupleDesc->natts;
381
382         Assert(numAttrs <= MaxHeapAttributeNumber);
383         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
384         if (oldtup != NULL)
385                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
386
387         /* ----------
388          * Then collect information about the values given
389          *
390          * NOTE: toast_action[i] can have these values:
391          *              ' '             default handling
392          *              'p'             already processed --- don't touch it
393          *              'x'             incompressible, but OK to move off
394          *
395          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
396          *              toast_action[i] different from 'p'.
397          * ----------
398          */
399         memset(toast_action, ' ', numAttrs * sizeof(char));
400         memset(toast_free, 0, numAttrs * sizeof(bool));
401         memset(toast_delold, 0, numAttrs * sizeof(bool));
402
403         for (i = 0; i < numAttrs; i++)
404         {
405                 varattrib  *old_value;
406                 varattrib  *new_value;
407
408                 if (oldtup != NULL)
409                 {
410                         /*
411                          * For UPDATE get the old and new values of this attribute
412                          */
413                         old_value = (varattrib *) DatumGetPointer(toast_oldvalues[i]);
414                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
415
416                         /*
417                          * If the old value is an external stored one, check if it has
418                          * changed so we have to delete it later.
419                          */
420                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
421                                 VARATT_IS_EXTERNAL(old_value))
422                         {
423                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
424                                         old_value->va_content.va_external.va_valueid !=
425                                         new_value->va_content.va_external.va_valueid ||
426                                         old_value->va_content.va_external.va_toastrelid !=
427                                         new_value->va_content.va_external.va_toastrelid)
428                                 {
429                                         /*
430                                          * The old external stored value isn't needed any more
431                                          * after the update
432                                          */
433                                         toast_delold[i] = true;
434                                         need_delold = true;
435                                 }
436                                 else
437                                 {
438                                         /*
439                                          * This attribute isn't changed by this update so we reuse
440                                          * the original reference to the old value in the new
441                                          * tuple.
442                                          */
443                                         toast_action[i] = 'p';
444                                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
445                                         continue;
446                                 }
447                         }
448                 }
449                 else
450                 {
451                         /*
452                          * For INSERT simply get the new value
453                          */
454                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
455                 }
456
457                 /*
458                  * Handle NULL attributes
459                  */
460                 if (toast_isnull[i])
461                 {
462                         toast_action[i] = 'p';
463                         has_nulls = true;
464                         continue;
465                 }
466
467                 /*
468                  * Now look at varlena attributes
469                  */
470                 if (att[i]->attlen == -1)
471                 {
472                         /*
473                          * If the table's attribute says PLAIN always, force it so.
474                          */
475                         if (att[i]->attstorage == 'p')
476                                 toast_action[i] = 'p';
477
478                         /*
479                          * We took care of UPDATE above, so any external value we find
480                          * still in the tuple must be someone else's we cannot reuse.
481                          * Expand it to plain (and, probably, toast it again below).
482                          */
483                         if (VARATT_IS_EXTERNAL(new_value))
484                         {
485                                 new_value = heap_tuple_untoast_attr(new_value);
486                                 toast_values[i] = PointerGetDatum(new_value);
487                                 toast_free[i] = true;
488                                 need_change = true;
489                                 need_free = true;
490                         }
491
492                         /*
493                          * Remember the size of this attribute
494                          */
495                         toast_sizes[i] = VARATT_SIZE(new_value);
496                 }
497                 else
498                 {
499                         /*
500                          * Not a varlena attribute, plain storage always
501                          */
502                         toast_action[i] = 'p';
503                 }
504         }
505
506         /* ----------
507          * Compress and/or save external until data fits into target length
508          *
509          *      1: Inline compress attributes with attstorage 'x'
510          *      2: Store attributes with attstorage 'x' or 'e' external
511          *      3: Inline compress attributes with attstorage 'm'
512          *      4: Store attributes with attstorage 'm' external
513          * ----------
514          */
515         maxDataLen = offsetof(HeapTupleHeaderData, t_bits);
516         if (has_nulls)
517                 maxDataLen += BITMAPLEN(numAttrs);
518         maxDataLen = TOAST_TUPLE_TARGET - MAXALIGN(maxDataLen);
519
520         /*
521          * Look for attributes with attstorage 'x' to compress
522          */
523         while (MAXALIGN(heap_compute_data_size(tupleDesc,
524                                                                                    toast_values, toast_isnull)) >
525                    maxDataLen)
526         {
527                 int                     biggest_attno = -1;
528                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
529                 Datum           old_value;
530                 Datum           new_value;
531
532                 /*
533                  * Search for the biggest yet uncompressed internal attribute
534                  */
535                 for (i = 0; i < numAttrs; i++)
536                 {
537                         if (toast_action[i] != ' ')
538                                 continue;
539                         if (VARATT_IS_EXTENDED(toast_values[i]))
540                                 continue;
541                         if (att[i]->attstorage != 'x')
542                                 continue;
543                         if (toast_sizes[i] > biggest_size)
544                         {
545                                 biggest_attno = i;
546                                 biggest_size = toast_sizes[i];
547                         }
548                 }
549
550                 if (biggest_attno < 0)
551                         break;
552
553                 /*
554                  * Attempt to compress it inline
555                  */
556                 i = biggest_attno;
557                 old_value = toast_values[i];
558                 new_value = toast_compress_datum(old_value);
559
560                 if (DatumGetPointer(new_value) != NULL)
561                 {
562                         /* successful compression */
563                         if (toast_free[i])
564                                 pfree(DatumGetPointer(old_value));
565                         toast_values[i] = new_value;
566                         toast_free[i] = true;
567                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
568                         need_change = true;
569                         need_free = true;
570                 }
571                 else
572                 {
573                         /*
574                          * incompressible data, ignore on subsequent compression passes
575                          */
576                         toast_action[i] = 'x';
577                 }
578         }
579
580         /*
581          * Second we look for attributes of attstorage 'x' or 'e' that are still
582          * inline.
583          */
584         while (MAXALIGN(heap_compute_data_size(tupleDesc,
585                                                                                    toast_values, toast_isnull)) >
586                    maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
587         {
588                 int                     biggest_attno = -1;
589                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
590                 Datum           old_value;
591
592                 /*------
593                  * Search for the biggest yet inlined attribute with
594                  * attstorage equals 'x' or 'e'
595                  *------
596                  */
597                 for (i = 0; i < numAttrs; i++)
598                 {
599                         if (toast_action[i] == 'p')
600                                 continue;
601                         if (VARATT_IS_EXTERNAL(toast_values[i]))
602                                 continue;
603                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
604                                 continue;
605                         if (toast_sizes[i] > biggest_size)
606                         {
607                                 biggest_attno = i;
608                                 biggest_size = toast_sizes[i];
609                         }
610                 }
611
612                 if (biggest_attno < 0)
613                         break;
614
615                 /*
616                  * Store this external
617                  */
618                 i = biggest_attno;
619                 old_value = toast_values[i];
620                 toast_action[i] = 'p';
621                 toast_values[i] = toast_save_datum(rel, toast_values[i]);
622                 if (toast_free[i])
623                         pfree(DatumGetPointer(old_value));
624
625                 toast_free[i] = true;
626                 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
627
628                 need_change = true;
629                 need_free = true;
630         }
631
632         /*
633          * Round 3 - this time we take attributes with storage 'm' into
634          * compression
635          */
636         while (MAXALIGN(heap_compute_data_size(tupleDesc,
637                                                                                    toast_values, toast_isnull)) >
638                    maxDataLen)
639         {
640                 int                     biggest_attno = -1;
641                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
642                 Datum           old_value;
643                 Datum           new_value;
644
645                 /*
646                  * Search for the biggest yet uncompressed internal attribute
647                  */
648                 for (i = 0; i < numAttrs; i++)
649                 {
650                         if (toast_action[i] != ' ')
651                                 continue;
652                         if (VARATT_IS_EXTENDED(toast_values[i]))
653                                 continue;
654                         if (att[i]->attstorage != 'm')
655                                 continue;
656                         if (toast_sizes[i] > biggest_size)
657                         {
658                                 biggest_attno = i;
659                                 biggest_size = toast_sizes[i];
660                         }
661                 }
662
663                 if (biggest_attno < 0)
664                         break;
665
666                 /*
667                  * Attempt to compress it inline
668                  */
669                 i = biggest_attno;
670                 old_value = toast_values[i];
671                 new_value = toast_compress_datum(old_value);
672
673                 if (DatumGetPointer(new_value) != NULL)
674                 {
675                         /* successful compression */
676                         if (toast_free[i])
677                                 pfree(DatumGetPointer(old_value));
678                         toast_values[i] = new_value;
679                         toast_free[i] = true;
680                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
681                         need_change = true;
682                         need_free = true;
683                 }
684                 else
685                 {
686                         /*
687                          * incompressible data, ignore on subsequent compression passes
688                          */
689                         toast_action[i] = 'x';
690                 }
691         }
692
693         /*
694          * Finally we store attributes of type 'm' external
695          */
696         while (MAXALIGN(heap_compute_data_size(tupleDesc,
697                                                                                    toast_values, toast_isnull)) >
698                    maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
699         {
700                 int                     biggest_attno = -1;
701                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
702                 Datum           old_value;
703
704                 /*--------
705                  * Search for the biggest yet inlined attribute with
706                  * attstorage = 'm'
707                  *--------
708                  */
709                 for (i = 0; i < numAttrs; i++)
710                 {
711                         if (toast_action[i] == 'p')
712                                 continue;
713                         if (VARATT_IS_EXTERNAL(toast_values[i]))
714                                 continue;
715                         if (att[i]->attstorage != 'm')
716                                 continue;
717                         if (toast_sizes[i] > biggest_size)
718                         {
719                                 biggest_attno = i;
720                                 biggest_size = toast_sizes[i];
721                         }
722                 }
723
724                 if (biggest_attno < 0)
725                         break;
726
727                 /*
728                  * Store this external
729                  */
730                 i = biggest_attno;
731                 old_value = toast_values[i];
732                 toast_action[i] = 'p';
733                 toast_values[i] = toast_save_datum(rel, toast_values[i]);
734                 if (toast_free[i])
735                         pfree(DatumGetPointer(old_value));
736
737                 toast_free[i] = true;
738                 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
739
740                 need_change = true;
741                 need_free = true;
742         }
743
744         /*
745          * In the case we toasted any values, we need to build a new heap tuple
746          * with the changed values.
747          */
748         if (need_change)
749         {
750                 HeapTupleHeader olddata = newtup->t_data;
751                 HeapTupleHeader new_data;
752                 int32           new_len;
753
754                 /*
755                  * Calculate the new size of the tuple.  Header size should not
756                  * change, but data size might.
757                  */
758                 new_len = offsetof(HeapTupleHeaderData, t_bits);
759                 if (has_nulls)
760                         new_len += BITMAPLEN(numAttrs);
761                 if (olddata->t_infomask & HEAP_HASOID)
762                         new_len += sizeof(Oid);
763                 new_len = MAXALIGN(new_len);
764                 Assert(new_len == olddata->t_hoff);
765                 new_len += heap_compute_data_size(tupleDesc,
766                                                                                   toast_values, toast_isnull);
767
768                 /*
769                  * Allocate and zero the space needed, and fill HeapTupleData fields.
770                  */
771                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
772                 result_tuple->t_len = new_len;
773                 result_tuple->t_self = newtup->t_self;
774                 result_tuple->t_tableOid = newtup->t_tableOid;
775                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
776                 result_tuple->t_data = new_data;
777
778                 /*
779                  * Put the existing tuple header and the changed values into place
780                  */
781                 memcpy(new_data, olddata, olddata->t_hoff);
782
783                 heap_fill_tuple(tupleDesc,
784                                                 toast_values,
785                                                 toast_isnull,
786                                                 (char *) new_data + olddata->t_hoff,
787                                                 &(new_data->t_infomask),
788                                                 has_nulls ? new_data->t_bits : NULL);
789         }
790         else
791                 result_tuple = newtup;
792
793         /*
794          * Free allocated temp values
795          */
796         if (need_free)
797                 for (i = 0; i < numAttrs; i++)
798                         if (toast_free[i])
799                                 pfree(DatumGetPointer(toast_values[i]));
800
801         /*
802          * Delete external values from the old tuple
803          */
804         if (need_delold)
805                 for (i = 0; i < numAttrs; i++)
806                         if (toast_delold[i])
807                                 toast_delete_datum(rel, toast_oldvalues[i]);
808
809         return result_tuple;
810 }
811
812
813 /* ----------
814  * toast_flatten_tuple_attribute -
815  *
816  *      If a Datum is of composite type, "flatten" it to contain no toasted fields.
817  *      This must be invoked on any potentially-composite field that is to be
818  *      inserted into a tuple.  Doing this preserves the invariant that toasting
819  *      goes only one level deep in a tuple.
820  * ----------
821  */
822 Datum
823 toast_flatten_tuple_attribute(Datum value,
824                                                           Oid typeId, int32 typeMod)
825 {
826         TupleDesc       tupleDesc;
827         HeapTupleHeader olddata;
828         HeapTupleHeader new_data;
829         int32           new_len;
830         HeapTupleData tmptup;
831         Form_pg_attribute *att;
832         int                     numAttrs;
833         int                     i;
834         bool            need_change = false;
835         bool            has_nulls = false;
836         Datum           toast_values[MaxTupleAttributeNumber];
837         bool            toast_isnull[MaxTupleAttributeNumber];
838         bool            toast_free[MaxTupleAttributeNumber];
839
840         /*
841          * See if it's a composite type, and get the tupdesc if so.
842          */
843         tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
844         if (tupleDesc == NULL)
845                 return value;                   /* not a composite type */
846
847         att = tupleDesc->attrs;
848         numAttrs = tupleDesc->natts;
849
850         /*
851          * Break down the tuple into fields.
852          */
853         olddata = DatumGetHeapTupleHeader(value);
854         Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
855         Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
856         /* Build a temporary HeapTuple control structure */
857         tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
858         ItemPointerSetInvalid(&(tmptup.t_self));
859         tmptup.t_tableOid = InvalidOid;
860         tmptup.t_data = olddata;
861
862         Assert(numAttrs <= MaxTupleAttributeNumber);
863         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
864
865         memset(toast_free, 0, numAttrs * sizeof(bool));
866
867         for (i = 0; i < numAttrs; i++)
868         {
869                 /*
870                  * Look at non-null varlena attributes
871                  */
872                 if (toast_isnull[i])
873                         has_nulls = true;
874                 else if (att[i]->attlen == -1)
875                 {
876                         varattrib  *new_value;
877
878                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
879                         if (VARATT_IS_EXTENDED(new_value))
880                         {
881                                 new_value = heap_tuple_untoast_attr(new_value);
882                                 toast_values[i] = PointerGetDatum(new_value);
883                                 toast_free[i] = true;
884                                 need_change = true;
885                         }
886                 }
887         }
888
889         /*
890          * If nothing to untoast, just return the original tuple.
891          */
892         if (!need_change)
893         {
894                 ReleaseTupleDesc(tupleDesc);
895                 return value;
896         }
897
898         /*
899          * Calculate the new size of the tuple.  Header size should not change,
900          * but data size might.
901          */
902         new_len = offsetof(HeapTupleHeaderData, t_bits);
903         if (has_nulls)
904                 new_len += BITMAPLEN(numAttrs);
905         if (olddata->t_infomask & HEAP_HASOID)
906                 new_len += sizeof(Oid);
907         new_len = MAXALIGN(new_len);
908         Assert(new_len == olddata->t_hoff);
909         new_len += heap_compute_data_size(tupleDesc, toast_values, toast_isnull);
910
911         new_data = (HeapTupleHeader) palloc0(new_len);
912
913         /*
914          * Put the tuple header and the changed values into place
915          */
916         memcpy(new_data, olddata, olddata->t_hoff);
917
918         HeapTupleHeaderSetDatumLength(new_data, new_len);
919
920         heap_fill_tuple(tupleDesc,
921                                         toast_values,
922                                         toast_isnull,
923                                         (char *) new_data + olddata->t_hoff,
924                                         &(new_data->t_infomask),
925                                         has_nulls ? new_data->t_bits : NULL);
926
927         /*
928          * Free allocated temp values
929          */
930         for (i = 0; i < numAttrs; i++)
931                 if (toast_free[i])
932                         pfree(DatumGetPointer(toast_values[i]));
933         ReleaseTupleDesc(tupleDesc);
934
935         return PointerGetDatum(new_data);
936 }
937
938
939 /* ----------
940  * toast_compress_datum -
941  *
942  *      Create a compressed version of a varlena datum
943  *
944  *      If we fail (ie, compressed result is actually bigger than original)
945  *      then return NULL.  We must not use compressed data if it'd expand
946  *      the tuple!
947  * ----------
948  */
949 Datum
950 toast_compress_datum(Datum value)
951 {
952         varattrib  *tmp;
953
954         tmp = (varattrib *) palloc(sizeof(PGLZ_Header) + VARATT_SIZE(value));
955         pglz_compress(VARATT_DATA(value), VARATT_SIZE(value) - VARHDRSZ,
956                                   (PGLZ_Header *) tmp,
957                                   PGLZ_strategy_default);
958         if (VARATT_SIZE(tmp) < VARATT_SIZE(value))
959         {
960                 /* successful compression */
961                 VARATT_SIZEP(tmp) |= VARATT_FLAG_COMPRESSED;
962                 return PointerGetDatum(tmp);
963         }
964         else
965         {
966                 /* incompressible data */
967                 pfree(tmp);
968                 return PointerGetDatum(NULL);
969         }
970 }
971
972
973 /* ----------
974  * toast_save_datum -
975  *
976  *      Save one single datum into the secondary relation and return
977  *      a varattrib reference for it.
978  * ----------
979  */
980 static Datum
981 toast_save_datum(Relation rel, Datum value)
982 {
983         Relation        toastrel;
984         Relation        toastidx;
985         HeapTuple       toasttup;
986         TupleDesc       toasttupDesc;
987         Datum           t_values[3];
988         bool            t_isnull[3];
989         varattrib  *result;
990         struct
991         {
992                 struct varlena hdr;
993                 char            data[TOAST_MAX_CHUNK_SIZE];
994         }                       chunk_data;
995         int32           chunk_size;
996         int32           chunk_seq = 0;
997         char       *data_p;
998         int32           data_todo;
999
1000         /*
1001          * Open the toast relation and its index.  We can use the index to check
1002          * uniqueness of the OID we assign to the toasted item, even though it has
1003          * additional columns besides OID.
1004          */
1005         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1006         toasttupDesc = toastrel->rd_att;
1007         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1008
1009         /*
1010          * Create the varattrib reference
1011          */
1012         result = (varattrib *) palloc(sizeof(varattrib));
1013
1014         result->va_header = sizeof(varattrib) | VARATT_FLAG_EXTERNAL;
1015         if (VARATT_IS_COMPRESSED(value))
1016         {
1017                 result->va_header |= VARATT_FLAG_COMPRESSED;
1018                 result->va_content.va_external.va_rawsize =
1019                         ((varattrib *) value)->va_content.va_compressed.va_rawsize;
1020         }
1021         else
1022                 result->va_content.va_external.va_rawsize = VARATT_SIZE(value);
1023
1024         result->va_content.va_external.va_extsize =
1025                 VARATT_SIZE(value) - VARHDRSZ;
1026         result->va_content.va_external.va_valueid =
1027                 GetNewOidWithIndex(toastrel, toastidx);
1028         result->va_content.va_external.va_toastrelid =
1029                 rel->rd_rel->reltoastrelid;
1030
1031         /*
1032          * Initialize constant parts of the tuple data
1033          */
1034         t_values[0] = ObjectIdGetDatum(result->va_content.va_external.va_valueid);
1035         t_values[2] = PointerGetDatum(&chunk_data);
1036         t_isnull[0] = false;
1037         t_isnull[1] = false;
1038         t_isnull[2] = false;
1039
1040         /*
1041          * Get the data to process
1042          */
1043         data_p = VARATT_DATA(value);
1044         data_todo = VARATT_SIZE(value) - VARHDRSZ;
1045
1046         /*
1047          * Split up the item into chunks
1048          */
1049         while (data_todo > 0)
1050         {
1051                 /*
1052                  * Calculate the size of this chunk
1053                  */
1054                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1055
1056                 /*
1057                  * Build a tuple and store it
1058                  */
1059                 t_values[1] = Int32GetDatum(chunk_seq++);
1060                 VARATT_SIZEP(&chunk_data) = chunk_size + VARHDRSZ;
1061                 memcpy(VARATT_DATA(&chunk_data), data_p, chunk_size);
1062                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1063                 if (!HeapTupleIsValid(toasttup))
1064                         elog(ERROR, "failed to build TOAST tuple");
1065
1066                 simple_heap_insert(toastrel, toasttup);
1067
1068                 /*
1069                  * Create the index entry.      We cheat a little here by not using
1070                  * FormIndexDatum: this relies on the knowledge that the index columns
1071                  * are the same as the initial columns of the table.
1072                  *
1073                  * Note also that there had better not be any user-created index on
1074                  * the TOAST table, since we don't bother to update anything else.
1075                  */
1076                 index_insert(toastidx, t_values, t_isnull,
1077                                          &(toasttup->t_self),
1078                                          toastrel, toastidx->rd_index->indisunique);
1079
1080                 /*
1081                  * Free memory
1082                  */
1083                 heap_freetuple(toasttup);
1084
1085                 /*
1086                  * Move on to next chunk
1087                  */
1088                 data_todo -= chunk_size;
1089                 data_p += chunk_size;
1090         }
1091
1092         /*
1093          * Done - close toast relation and return the reference
1094          */
1095         index_close(toastidx, RowExclusiveLock);
1096         heap_close(toastrel, RowExclusiveLock);
1097
1098         return PointerGetDatum(result);
1099 }
1100
1101
1102 /* ----------
1103  * toast_delete_datum -
1104  *
1105  *      Delete a single external stored value.
1106  * ----------
1107  */
1108 static void
1109 toast_delete_datum(Relation rel, Datum value)
1110 {
1111         varattrib  *attr = (varattrib *) DatumGetPointer(value);
1112         Relation        toastrel;
1113         Relation        toastidx;
1114         ScanKeyData toastkey;
1115         IndexScanDesc toastscan;
1116         HeapTuple       toasttup;
1117
1118         if (!VARATT_IS_EXTERNAL(attr))
1119                 return;
1120
1121         /*
1122          * Open the toast relation and it's index
1123          */
1124         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1125                                                  RowExclusiveLock);
1126         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1127
1128         /*
1129          * Setup a scan key to fetch from the index by va_valueid (we don't
1130          * particularly care whether we see them in sequence or not)
1131          */
1132         ScanKeyInit(&toastkey,
1133                                 (AttrNumber) 1,
1134                                 BTEqualStrategyNumber, F_OIDEQ,
1135                                 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1136
1137         /*
1138          * Find the chunks by index
1139          */
1140         toastscan = index_beginscan(toastrel, toastidx,
1141                                                                 SnapshotToast, 1, &toastkey);
1142         while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1143         {
1144                 /*
1145                  * Have a chunk, delete it
1146                  */
1147                 simple_heap_delete(toastrel, &toasttup->t_self);
1148         }
1149
1150         /*
1151          * End scan and close relations
1152          */
1153         index_endscan(toastscan);
1154         index_close(toastidx, RowExclusiveLock);
1155         heap_close(toastrel, RowExclusiveLock);
1156 }
1157
1158
1159 /* ----------
1160  * toast_fetch_datum -
1161  *
1162  *      Reconstruct an in memory varattrib from the chunks saved
1163  *      in the toast relation
1164  * ----------
1165  */
1166 static varattrib *
1167 toast_fetch_datum(varattrib *attr)
1168 {
1169         Relation        toastrel;
1170         Relation        toastidx;
1171         ScanKeyData toastkey;
1172         IndexScanDesc toastscan;
1173         HeapTuple       ttup;
1174         TupleDesc       toasttupDesc;
1175         varattrib  *result;
1176         int32           ressize;
1177         int32           residx,
1178                                 nextidx;
1179         int32           numchunks;
1180         Pointer         chunk;
1181         bool            isnull;
1182         int32           chunksize;
1183
1184         ressize = attr->va_content.va_external.va_extsize;
1185         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1186
1187         result = (varattrib *) palloc(ressize + VARHDRSZ);
1188         VARATT_SIZEP(result) = ressize + VARHDRSZ;
1189         if (VARATT_IS_COMPRESSED(attr))
1190                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1191
1192         /*
1193          * Open the toast relation and its index
1194          */
1195         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1196                                                  AccessShareLock);
1197         toasttupDesc = toastrel->rd_att;
1198         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1199
1200         /*
1201          * Setup a scan key to fetch from the index by va_valueid
1202          */
1203         ScanKeyInit(&toastkey,
1204                                 (AttrNumber) 1,
1205                                 BTEqualStrategyNumber, F_OIDEQ,
1206                                 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1207
1208         /*
1209          * Read the chunks by index
1210          *
1211          * Note that because the index is actually on (valueid, chunkidx) we will
1212          * see the chunks in chunkidx order, even though we didn't explicitly ask
1213          * for it.
1214          */
1215         nextidx = 0;
1216
1217         toastscan = index_beginscan(toastrel, toastidx,
1218                                                                 SnapshotToast, 1, &toastkey);
1219         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1220         {
1221                 /*
1222                  * Have a chunk, extract the sequence number and the data
1223                  */
1224                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1225                 Assert(!isnull);
1226                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1227                 Assert(!isnull);
1228                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1229
1230                 /*
1231                  * Some checks on the data we've found
1232                  */
1233                 if (residx != nextidx)
1234                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1235                                  residx, nextidx,
1236                                  attr->va_content.va_external.va_valueid);
1237                 if (residx < numchunks - 1)
1238                 {
1239                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1240                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1241                                          chunksize, residx,
1242                                          attr->va_content.va_external.va_valueid);
1243                 }
1244                 else if (residx < numchunks)
1245                 {
1246                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1247                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1248                                          chunksize, residx,
1249                                          attr->va_content.va_external.va_valueid);
1250                 }
1251                 else
1252                         elog(ERROR, "unexpected chunk number %d for toast value %u",
1253                                  residx,
1254                                  attr->va_content.va_external.va_valueid);
1255
1256                 /*
1257                  * Copy the data into proper place in our result
1258                  */
1259                 memcpy(((char *) VARATT_DATA(result)) + residx * TOAST_MAX_CHUNK_SIZE,
1260                            VARATT_DATA(chunk),
1261                            chunksize);
1262
1263                 nextidx++;
1264         }
1265
1266         /*
1267          * Final checks that we successfully fetched the datum
1268          */
1269         if (nextidx != numchunks)
1270                 elog(ERROR, "missing chunk number %d for toast value %u",
1271                          nextidx,
1272                          attr->va_content.va_external.va_valueid);
1273
1274         /*
1275          * End scan and close relations
1276          */
1277         index_endscan(toastscan);
1278         index_close(toastidx, AccessShareLock);
1279         heap_close(toastrel, AccessShareLock);
1280
1281         return result;
1282 }
1283
1284 /* ----------
1285  * toast_fetch_datum_slice -
1286  *
1287  *      Reconstruct a segment of a varattrib from the chunks saved
1288  *      in the toast relation
1289  * ----------
1290  */
1291 static varattrib *
1292 toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
1293 {
1294         Relation        toastrel;
1295         Relation        toastidx;
1296         ScanKeyData toastkey[3];
1297         int                     nscankeys;
1298         IndexScanDesc toastscan;
1299         HeapTuple       ttup;
1300         TupleDesc       toasttupDesc;
1301         varattrib  *result;
1302         int32           attrsize;
1303         int32           residx;
1304         int32           nextidx;
1305         int                     numchunks;
1306         int                     startchunk;
1307         int                     endchunk;
1308         int32           startoffset;
1309         int32           endoffset;
1310         int                     totalchunks;
1311         Pointer         chunk;
1312         bool            isnull;
1313         int32           chunksize;
1314         int32           chcpystrt;
1315         int32           chcpyend;
1316
1317         attrsize = attr->va_content.va_external.va_extsize;
1318         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1319
1320         if (sliceoffset >= attrsize)
1321         {
1322                 sliceoffset = 0;
1323                 length = 0;
1324         }
1325
1326         if (((sliceoffset + length) > attrsize) || length < 0)
1327                 length = attrsize - sliceoffset;
1328
1329         result = (varattrib *) palloc(length + VARHDRSZ);
1330         VARATT_SIZEP(result) = length + VARHDRSZ;
1331
1332         if (VARATT_IS_COMPRESSED(attr))
1333                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1334
1335         if (length == 0)
1336                 return result;          /* Can save a lot of work at this point! */
1337
1338         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1339         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1340         numchunks = (endchunk - startchunk) + 1;
1341
1342         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1343         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1344
1345         /*
1346          * Open the toast relation and it's index
1347          */
1348         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1349                                                  AccessShareLock);
1350         toasttupDesc = toastrel->rd_att;
1351         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1352
1353         /*
1354          * Setup a scan key to fetch from the index. This is either two keys or
1355          * three depending on the number of chunks.
1356          */
1357         ScanKeyInit(&toastkey[0],
1358                                 (AttrNumber) 1,
1359                                 BTEqualStrategyNumber, F_OIDEQ,
1360                                 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1361
1362         /*
1363          * Use equality condition for one chunk, a range condition otherwise:
1364          */
1365         if (numchunks == 1)
1366         {
1367                 ScanKeyInit(&toastkey[1],
1368                                         (AttrNumber) 2,
1369                                         BTEqualStrategyNumber, F_INT4EQ,
1370                                         Int32GetDatum(startchunk));
1371                 nscankeys = 2;
1372         }
1373         else
1374         {
1375                 ScanKeyInit(&toastkey[1],
1376                                         (AttrNumber) 2,
1377                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1378                                         Int32GetDatum(startchunk));
1379                 ScanKeyInit(&toastkey[2],
1380                                         (AttrNumber) 2,
1381                                         BTLessEqualStrategyNumber, F_INT4LE,
1382                                         Int32GetDatum(endchunk));
1383                 nscankeys = 3;
1384         }
1385
1386         /*
1387          * Read the chunks by index
1388          *
1389          * The index is on (valueid, chunkidx) so they will come in order
1390          */
1391         nextidx = startchunk;
1392         toastscan = index_beginscan(toastrel, toastidx,
1393                                                                 SnapshotToast, nscankeys, toastkey);
1394         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1395         {
1396                 /*
1397                  * Have a chunk, extract the sequence number and the data
1398                  */
1399                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1400                 Assert(!isnull);
1401                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1402                 Assert(!isnull);
1403                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1404
1405                 /*
1406                  * Some checks on the data we've found
1407                  */
1408                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1409                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1410                                  residx, nextidx,
1411                                  attr->va_content.va_external.va_valueid);
1412                 if (residx < totalchunks - 1)
1413                 {
1414                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1415                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1416                                          chunksize, residx,
1417                                          attr->va_content.va_external.va_valueid);
1418                 }
1419                 else
1420                 {
1421                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1422                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1423                                          chunksize, residx,
1424                                          attr->va_content.va_external.va_valueid);
1425                 }
1426
1427                 /*
1428                  * Copy the data into proper place in our result
1429                  */
1430                 chcpystrt = 0;
1431                 chcpyend = chunksize - 1;
1432                 if (residx == startchunk)
1433                         chcpystrt = startoffset;
1434                 if (residx == endchunk)
1435                         chcpyend = endoffset;
1436
1437                 memcpy(((char *) VARATT_DATA(result)) +
1438                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1439                            VARATT_DATA(chunk) + chcpystrt,
1440                            (chcpyend - chcpystrt) + 1);
1441
1442                 nextidx++;
1443         }
1444
1445         /*
1446          * Final checks that we successfully fetched the datum
1447          */
1448         if (nextidx != (endchunk + 1))
1449                 elog(ERROR, "missing chunk number %d for toast value %u",
1450                          nextidx,
1451                          attr->va_content.va_external.va_valueid);
1452
1453         /*
1454          * End scan and close relations
1455          */
1456         index_endscan(toastscan);
1457         index_close(toastidx, AccessShareLock);
1458         heap_close(toastrel, AccessShareLock);
1459
1460         return result;
1461 }