]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Fix problems with cached tuple descriptors disappearing while still in use
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2006, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.60 2006/06/16 18:42:21 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/heapam.h"
34 #include "access/genam.h"
35 #include "access/tuptoaster.h"
36 #include "catalog/catalog.h"
37 #include "utils/rel.h"
38 #include "utils/builtins.h"
39 #include "utils/fmgroids.h"
40 #include "utils/pg_lzcompress.h"
41 #include "utils/typcache.h"
42
43
44 #undef TOAST_DEBUG
45
46 static void toast_delete_datum(Relation rel, Datum value);
47 static Datum toast_save_datum(Relation rel, Datum value);
48 static varattrib *toast_fetch_datum(varattrib *attr);
49 static varattrib *toast_fetch_datum_slice(varattrib *attr,
50                                                 int32 sliceoffset, int32 length);
51
52
53 /* ----------
54  * heap_tuple_fetch_attr -
55  *
56  *      Public entry point to get back a toasted value
57  *      external storage (possibly still in compressed format).
58  * ----------
59  */
60 varattrib *
61 heap_tuple_fetch_attr(varattrib *attr)
62 {
63         varattrib  *result;
64
65         if (VARATT_IS_EXTERNAL(attr))
66         {
67                 /*
68                  * This is an external stored plain value
69                  */
70                 result = toast_fetch_datum(attr);
71         }
72         else
73         {
74                 /*
75                  * This is a plain value inside of the main tuple - why am I called?
76                  */
77                 result = attr;
78         }
79
80         return result;
81 }
82
83
84 /* ----------
85  * heap_tuple_untoast_attr -
86  *
87  *      Public entry point to get back a toasted value from compression
88  *      or external storage.
89  * ----------
90  */
91 varattrib *
92 heap_tuple_untoast_attr(varattrib *attr)
93 {
94         varattrib  *result;
95
96         if (VARATT_IS_EXTERNAL(attr))
97         {
98                 if (VARATT_IS_COMPRESSED(attr))
99                 {
100                         /* ----------
101                          * This is an external stored compressed value
102                          * Fetch it from the toast heap and decompress.
103                          * ----------
104                          */
105                         varattrib  *tmp;
106
107                         tmp = toast_fetch_datum(attr);
108                         result = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
109                                                                                   + VARHDRSZ);
110                         VARATT_SIZEP(result) = attr->va_content.va_external.va_rawsize
111                                 + VARHDRSZ;
112                         pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(result));
113
114                         pfree(tmp);
115                 }
116                 else
117                 {
118                         /*
119                          * This is an external stored plain value
120                          */
121                         result = toast_fetch_datum(attr);
122                 }
123         }
124         else if (VARATT_IS_COMPRESSED(attr))
125         {
126                 /*
127                  * This is a compressed value inside of the main tuple
128                  */
129                 result = (varattrib *) palloc(attr->va_content.va_compressed.va_rawsize
130                                                                           + VARHDRSZ);
131                 VARATT_SIZEP(result) = attr->va_content.va_compressed.va_rawsize
132                         + VARHDRSZ;
133                 pglz_decompress((PGLZ_Header *) attr, VARATT_DATA(result));
134         }
135         else
136
137                 /*
138                  * This is a plain value inside of the main tuple - why am I called?
139                  */
140                 return attr;
141
142         return result;
143 }
144
145
146 /* ----------
147  * heap_tuple_untoast_attr_slice -
148  *
149  *              Public entry point to get back part of a toasted value
150  *              from compression or external storage.
151  * ----------
152  */
153 varattrib *
154 heap_tuple_untoast_attr_slice(varattrib *attr, int32 sliceoffset, int32 slicelength)
155 {
156         varattrib  *preslice;
157         varattrib  *result;
158         int32           attrsize;
159
160         if (VARATT_IS_COMPRESSED(attr))
161         {
162                 varattrib  *tmp;
163
164                 if (VARATT_IS_EXTERNAL(attr))
165                         tmp = toast_fetch_datum(attr);
166                 else
167                 {
168                         tmp = attr;                     /* compressed in main tuple */
169                 }
170
171                 preslice = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
172                                                                                 + VARHDRSZ);
173                 VARATT_SIZEP(preslice) = attr->va_content.va_external.va_rawsize + VARHDRSZ;
174                 pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(preslice));
175
176                 if (tmp != attr)
177                         pfree(tmp);
178         }
179         else
180         {
181                 /* Plain value */
182                 if (VARATT_IS_EXTERNAL(attr))
183                 {
184                         /* fast path */
185                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
186                 }
187                 else
188                         preslice = attr;
189         }
190
191         /* slicing of datum for compressed cases and plain value */
192
193         attrsize = VARSIZE(preslice) - VARHDRSZ;
194         if (sliceoffset >= attrsize)
195         {
196                 sliceoffset = 0;
197                 slicelength = 0;
198         }
199
200         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
201                 slicelength = attrsize - sliceoffset;
202
203         result = (varattrib *) palloc(slicelength + VARHDRSZ);
204         VARATT_SIZEP(result) = slicelength + VARHDRSZ;
205
206         memcpy(VARDATA(result), VARDATA(preslice) + sliceoffset, slicelength);
207
208         if (preslice != attr)
209                 pfree(preslice);
210
211         return result;
212 }
213
214
215 /* ----------
216  * toast_raw_datum_size -
217  *
218  *      Return the raw (detoasted) size of a varlena datum
219  * ----------
220  */
221 Size
222 toast_raw_datum_size(Datum value)
223 {
224         varattrib  *attr = (varattrib *) DatumGetPointer(value);
225         Size            result;
226
227         if (VARATT_IS_COMPRESSED(attr))
228         {
229                 /*
230                  * va_rawsize shows the original data size, whether the datum is
231                  * external or not.
232                  */
233                 result = attr->va_content.va_compressed.va_rawsize + VARHDRSZ;
234         }
235         else if (VARATT_IS_EXTERNAL(attr))
236         {
237                 /*
238                  * an uncompressed external attribute has rawsize including the header
239                  * (not too consistent!)
240                  */
241                 result = attr->va_content.va_external.va_rawsize;
242         }
243         else
244         {
245                 /* plain untoasted datum */
246                 result = VARSIZE(attr);
247         }
248         return result;
249 }
250
251 /* ----------
252  * toast_datum_size
253  *
254  *      Return the physical storage size (possibly compressed) of a varlena datum
255  * ----------
256  */
257 Size
258 toast_datum_size(Datum value)
259 {
260         varattrib  *attr = (varattrib *) DatumGetPointer(value);
261         Size            result;
262
263         if (VARATT_IS_EXTERNAL(attr))
264         {
265                 /*
266                  * Attribute is stored externally - return the extsize whether
267                  * compressed or not.  We do not count the size of the toast pointer
268                  * ... should we?
269                  */
270                 result = attr->va_content.va_external.va_extsize;
271         }
272         else
273         {
274                 /*
275                  * Attribute is stored inline either compressed or not, just calculate
276                  * the size of the datum in either case.
277                  */
278                 result = VARSIZE(attr);
279         }
280         return result;
281 }
282
283
284 /* ----------
285  * toast_delete -
286  *
287  *      Cascaded delete toast-entries on DELETE
288  * ----------
289  */
290 void
291 toast_delete(Relation rel, HeapTuple oldtup)
292 {
293         TupleDesc       tupleDesc;
294         Form_pg_attribute *att;
295         int                     numAttrs;
296         int                     i;
297         Datum           toast_values[MaxHeapAttributeNumber];
298         bool            toast_isnull[MaxHeapAttributeNumber];
299
300         /*
301          * Get the tuple descriptor and break down the tuple into fields.
302          *
303          * NOTE: it's debatable whether to use heap_deformtuple() here or just
304          * heap_getattr() only the varlena columns.  The latter could win if there
305          * are few varlena columns and many non-varlena ones. However,
306          * heap_deformtuple costs only O(N) while the heap_getattr way would cost
307          * O(N^2) if there are many varlena columns, so it seems better to err on
308          * the side of linear cost.  (We won't even be here unless there's at
309          * least one varlena column, by the way.)
310          */
311         tupleDesc = rel->rd_att;
312         att = tupleDesc->attrs;
313         numAttrs = tupleDesc->natts;
314
315         Assert(numAttrs <= MaxHeapAttributeNumber);
316         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
317
318         /*
319          * Check for external stored attributes and delete them from the secondary
320          * relation.
321          */
322         for (i = 0; i < numAttrs; i++)
323         {
324                 if (att[i]->attlen == -1)
325                 {
326                         Datum           value = toast_values[i];
327
328                         if (!toast_isnull[i] && VARATT_IS_EXTERNAL(value))
329                                 toast_delete_datum(rel, value);
330                 }
331         }
332 }
333
334
335 /* ----------
336  * toast_insert_or_update -
337  *
338  *      Delete no-longer-used toast-entries and create new ones to
339  *      make the new tuple fit on INSERT or UPDATE
340  *
341  * Inputs:
342  *      newtup: the candidate new tuple to be inserted
343  *      oldtup: the old row version for UPDATE, or NULL for INSERT
344  * Result:
345  *      either newtup if no toasting is needed, or a palloc'd modified tuple
346  *      that is what should actually get stored
347  *
348  * NOTE: neither newtup nor oldtup will be modified.  This is a change
349  * from the pre-8.1 API of this routine.
350  * ----------
351  */
352 HeapTuple
353 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup)
354 {
355         HeapTuple       result_tuple;
356         TupleDesc       tupleDesc;
357         Form_pg_attribute *att;
358         int                     numAttrs;
359         int                     i;
360
361         bool            need_change = false;
362         bool            need_free = false;
363         bool            need_delold = false;
364         bool            has_nulls = false;
365
366         Size            maxDataLen;
367
368         char            toast_action[MaxHeapAttributeNumber];
369         bool            toast_isnull[MaxHeapAttributeNumber];
370         bool            toast_oldisnull[MaxHeapAttributeNumber];
371         Datum           toast_values[MaxHeapAttributeNumber];
372         Datum           toast_oldvalues[MaxHeapAttributeNumber];
373         int32           toast_sizes[MaxHeapAttributeNumber];
374         bool            toast_free[MaxHeapAttributeNumber];
375         bool            toast_delold[MaxHeapAttributeNumber];
376
377         /*
378          * Get the tuple descriptor and break down the tuple(s) into fields.
379          */
380         tupleDesc = rel->rd_att;
381         att = tupleDesc->attrs;
382         numAttrs = tupleDesc->natts;
383
384         Assert(numAttrs <= MaxHeapAttributeNumber);
385         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
386         if (oldtup != NULL)
387                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
388
389         /* ----------
390          * Then collect information about the values given
391          *
392          * NOTE: toast_action[i] can have these values:
393          *              ' '             default handling
394          *              'p'             already processed --- don't touch it
395          *              'x'             incompressible, but OK to move off
396          *
397          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
398          *              toast_action[i] different from 'p'.
399          * ----------
400          */
401         memset(toast_action, ' ', numAttrs * sizeof(char));
402         memset(toast_free, 0, numAttrs * sizeof(bool));
403         memset(toast_delold, 0, numAttrs * sizeof(bool));
404
405         for (i = 0; i < numAttrs; i++)
406         {
407                 varattrib  *old_value;
408                 varattrib  *new_value;
409
410                 if (oldtup != NULL)
411                 {
412                         /*
413                          * For UPDATE get the old and new values of this attribute
414                          */
415                         old_value = (varattrib *) DatumGetPointer(toast_oldvalues[i]);
416                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
417
418                         /*
419                          * If the old value is an external stored one, check if it has
420                          * changed so we have to delete it later.
421                          */
422                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
423                                 VARATT_IS_EXTERNAL(old_value))
424                         {
425                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
426                                         old_value->va_content.va_external.va_valueid !=
427                                         new_value->va_content.va_external.va_valueid ||
428                                         old_value->va_content.va_external.va_toastrelid !=
429                                         new_value->va_content.va_external.va_toastrelid)
430                                 {
431                                         /*
432                                          * The old external stored value isn't needed any more
433                                          * after the update
434                                          */
435                                         toast_delold[i] = true;
436                                         need_delold = true;
437                                 }
438                                 else
439                                 {
440                                         /*
441                                          * This attribute isn't changed by this update so we reuse
442                                          * the original reference to the old value in the new
443                                          * tuple.
444                                          */
445                                         toast_action[i] = 'p';
446                                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
447                                         continue;
448                                 }
449                         }
450                 }
451                 else
452                 {
453                         /*
454                          * For INSERT simply get the new value
455                          */
456                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
457                 }
458
459                 /*
460                  * Handle NULL attributes
461                  */
462                 if (toast_isnull[i])
463                 {
464                         toast_action[i] = 'p';
465                         has_nulls = true;
466                         continue;
467                 }
468
469                 /*
470                  * Now look at varlena attributes
471                  */
472                 if (att[i]->attlen == -1)
473                 {
474                         /*
475                          * If the table's attribute says PLAIN always, force it so.
476                          */
477                         if (att[i]->attstorage == 'p')
478                                 toast_action[i] = 'p';
479
480                         /*
481                          * We took care of UPDATE above, so any external value we find
482                          * still in the tuple must be someone else's we cannot reuse.
483                          * Expand it to plain (and, probably, toast it again below).
484                          */
485                         if (VARATT_IS_EXTERNAL(new_value))
486                         {
487                                 new_value = heap_tuple_untoast_attr(new_value);
488                                 toast_values[i] = PointerGetDatum(new_value);
489                                 toast_free[i] = true;
490                                 need_change = true;
491                                 need_free = true;
492                         }
493
494                         /*
495                          * Remember the size of this attribute
496                          */
497                         toast_sizes[i] = VARATT_SIZE(new_value);
498                 }
499                 else
500                 {
501                         /*
502                          * Not a varlena attribute, plain storage always
503                          */
504                         toast_action[i] = 'p';
505                 }
506         }
507
508         /* ----------
509          * Compress and/or save external until data fits into target length
510          *
511          *      1: Inline compress attributes with attstorage 'x'
512          *      2: Store attributes with attstorage 'x' or 'e' external
513          *      3: Inline compress attributes with attstorage 'm'
514          *      4: Store attributes with attstorage 'm' external
515          * ----------
516          */
517         maxDataLen = offsetof(HeapTupleHeaderData, t_bits);
518         if (has_nulls)
519                 maxDataLen += BITMAPLEN(numAttrs);
520         maxDataLen = TOAST_TUPLE_TARGET - MAXALIGN(maxDataLen);
521
522         /*
523          * Look for attributes with attstorage 'x' to compress
524          */
525         while (MAXALIGN(heap_compute_data_size(tupleDesc,
526                                                                                    toast_values, toast_isnull)) >
527                    maxDataLen)
528         {
529                 int                     biggest_attno = -1;
530                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
531                 Datum           old_value;
532                 Datum           new_value;
533
534                 /*
535                  * Search for the biggest yet uncompressed internal attribute
536                  */
537                 for (i = 0; i < numAttrs; i++)
538                 {
539                         if (toast_action[i] != ' ')
540                                 continue;
541                         if (VARATT_IS_EXTENDED(toast_values[i]))
542                                 continue;
543                         if (att[i]->attstorage != 'x')
544                                 continue;
545                         if (toast_sizes[i] > biggest_size)
546                         {
547                                 biggest_attno = i;
548                                 biggest_size = toast_sizes[i];
549                         }
550                 }
551
552                 if (biggest_attno < 0)
553                         break;
554
555                 /*
556                  * Attempt to compress it inline
557                  */
558                 i = biggest_attno;
559                 old_value = toast_values[i];
560                 new_value = toast_compress_datum(old_value);
561
562                 if (DatumGetPointer(new_value) != NULL)
563                 {
564                         /* successful compression */
565                         if (toast_free[i])
566                                 pfree(DatumGetPointer(old_value));
567                         toast_values[i] = new_value;
568                         toast_free[i] = true;
569                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
570                         need_change = true;
571                         need_free = true;
572                 }
573                 else
574                 {
575                         /*
576                          * incompressible data, ignore on subsequent compression passes
577                          */
578                         toast_action[i] = 'x';
579                 }
580         }
581
582         /*
583          * Second we look for attributes of attstorage 'x' or 'e' that are still
584          * inline.
585          */
586         while (MAXALIGN(heap_compute_data_size(tupleDesc,
587                                                                                    toast_values, toast_isnull)) >
588                    maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
589         {
590                 int                     biggest_attno = -1;
591                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
592                 Datum           old_value;
593
594                 /*------
595                  * Search for the biggest yet inlined attribute with
596                  * attstorage equals 'x' or 'e'
597                  *------
598                  */
599                 for (i = 0; i < numAttrs; i++)
600                 {
601                         if (toast_action[i] == 'p')
602                                 continue;
603                         if (VARATT_IS_EXTERNAL(toast_values[i]))
604                                 continue;
605                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
606                                 continue;
607                         if (toast_sizes[i] > biggest_size)
608                         {
609                                 biggest_attno = i;
610                                 biggest_size = toast_sizes[i];
611                         }
612                 }
613
614                 if (biggest_attno < 0)
615                         break;
616
617                 /*
618                  * Store this external
619                  */
620                 i = biggest_attno;
621                 old_value = toast_values[i];
622                 toast_action[i] = 'p';
623                 toast_values[i] = toast_save_datum(rel, toast_values[i]);
624                 if (toast_free[i])
625                         pfree(DatumGetPointer(old_value));
626
627                 toast_free[i] = true;
628                 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
629
630                 need_change = true;
631                 need_free = true;
632         }
633
634         /*
635          * Round 3 - this time we take attributes with storage 'm' into
636          * compression
637          */
638         while (MAXALIGN(heap_compute_data_size(tupleDesc,
639                                                                                    toast_values, toast_isnull)) >
640                    maxDataLen)
641         {
642                 int                     biggest_attno = -1;
643                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
644                 Datum           old_value;
645                 Datum           new_value;
646
647                 /*
648                  * Search for the biggest yet uncompressed internal attribute
649                  */
650                 for (i = 0; i < numAttrs; i++)
651                 {
652                         if (toast_action[i] != ' ')
653                                 continue;
654                         if (VARATT_IS_EXTENDED(toast_values[i]))
655                                 continue;
656                         if (att[i]->attstorage != 'm')
657                                 continue;
658                         if (toast_sizes[i] > biggest_size)
659                         {
660                                 biggest_attno = i;
661                                 biggest_size = toast_sizes[i];
662                         }
663                 }
664
665                 if (biggest_attno < 0)
666                         break;
667
668                 /*
669                  * Attempt to compress it inline
670                  */
671                 i = biggest_attno;
672                 old_value = toast_values[i];
673                 new_value = toast_compress_datum(old_value);
674
675                 if (DatumGetPointer(new_value) != NULL)
676                 {
677                         /* successful compression */
678                         if (toast_free[i])
679                                 pfree(DatumGetPointer(old_value));
680                         toast_values[i] = new_value;
681                         toast_free[i] = true;
682                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
683                         need_change = true;
684                         need_free = true;
685                 }
686                 else
687                 {
688                         /*
689                          * incompressible data, ignore on subsequent compression passes
690                          */
691                         toast_action[i] = 'x';
692                 }
693         }
694
695         /*
696          * Finally we store attributes of type 'm' external
697          */
698         while (MAXALIGN(heap_compute_data_size(tupleDesc,
699                                                                                    toast_values, toast_isnull)) >
700                    maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
701         {
702                 int                     biggest_attno = -1;
703                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
704                 Datum           old_value;
705
706                 /*--------
707                  * Search for the biggest yet inlined attribute with
708                  * attstorage = 'm'
709                  *--------
710                  */
711                 for (i = 0; i < numAttrs; i++)
712                 {
713                         if (toast_action[i] == 'p')
714                                 continue;
715                         if (VARATT_IS_EXTERNAL(toast_values[i]))
716                                 continue;
717                         if (att[i]->attstorage != 'm')
718                                 continue;
719                         if (toast_sizes[i] > biggest_size)
720                         {
721                                 biggest_attno = i;
722                                 biggest_size = toast_sizes[i];
723                         }
724                 }
725
726                 if (biggest_attno < 0)
727                         break;
728
729                 /*
730                  * Store this external
731                  */
732                 i = biggest_attno;
733                 old_value = toast_values[i];
734                 toast_action[i] = 'p';
735                 toast_values[i] = toast_save_datum(rel, toast_values[i]);
736                 if (toast_free[i])
737                         pfree(DatumGetPointer(old_value));
738
739                 toast_free[i] = true;
740                 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
741
742                 need_change = true;
743                 need_free = true;
744         }
745
746         /*
747          * In the case we toasted any values, we need to build a new heap tuple
748          * with the changed values.
749          */
750         if (need_change)
751         {
752                 HeapTupleHeader olddata = newtup->t_data;
753                 HeapTupleHeader new_data;
754                 int32           new_len;
755
756                 /*
757                  * Calculate the new size of the tuple.  Header size should not
758                  * change, but data size might.
759                  */
760                 new_len = offsetof(HeapTupleHeaderData, t_bits);
761                 if (has_nulls)
762                         new_len += BITMAPLEN(numAttrs);
763                 if (olddata->t_infomask & HEAP_HASOID)
764                         new_len += sizeof(Oid);
765                 new_len = MAXALIGN(new_len);
766                 Assert(new_len == olddata->t_hoff);
767                 new_len += heap_compute_data_size(tupleDesc,
768                                                                                   toast_values, toast_isnull);
769
770                 /*
771                  * Allocate and zero the space needed, and fill HeapTupleData fields.
772                  */
773                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
774                 result_tuple->t_len = new_len;
775                 result_tuple->t_self = newtup->t_self;
776                 result_tuple->t_tableOid = newtup->t_tableOid;
777                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
778                 result_tuple->t_data = new_data;
779
780                 /*
781                  * Put the existing tuple header and the changed values into place
782                  */
783                 memcpy(new_data, olddata, olddata->t_hoff);
784
785                 heap_fill_tuple(tupleDesc,
786                                                 toast_values,
787                                                 toast_isnull,
788                                                 (char *) new_data + olddata->t_hoff,
789                                                 &(new_data->t_infomask),
790                                                 has_nulls ? new_data->t_bits : NULL);
791         }
792         else
793                 result_tuple = newtup;
794
795         /*
796          * Free allocated temp values
797          */
798         if (need_free)
799                 for (i = 0; i < numAttrs; i++)
800                         if (toast_free[i])
801                                 pfree(DatumGetPointer(toast_values[i]));
802
803         /*
804          * Delete external values from the old tuple
805          */
806         if (need_delold)
807                 for (i = 0; i < numAttrs; i++)
808                         if (toast_delold[i])
809                                 toast_delete_datum(rel, toast_oldvalues[i]);
810
811         return result_tuple;
812 }
813
814
815 /* ----------
816  * toast_flatten_tuple_attribute -
817  *
818  *      If a Datum is of composite type, "flatten" it to contain no toasted fields.
819  *      This must be invoked on any potentially-composite field that is to be
820  *      inserted into a tuple.  Doing this preserves the invariant that toasting
821  *      goes only one level deep in a tuple.
822  * ----------
823  */
824 Datum
825 toast_flatten_tuple_attribute(Datum value,
826                                                           Oid typeId, int32 typeMod)
827 {
828         TupleDesc       tupleDesc;
829         HeapTupleHeader olddata;
830         HeapTupleHeader new_data;
831         int32           new_len;
832         HeapTupleData tmptup;
833         Form_pg_attribute *att;
834         int                     numAttrs;
835         int                     i;
836         bool            need_change = false;
837         bool            has_nulls = false;
838         Datum           toast_values[MaxTupleAttributeNumber];
839         bool            toast_isnull[MaxTupleAttributeNumber];
840         bool            toast_free[MaxTupleAttributeNumber];
841
842         /*
843          * See if it's a composite type, and get the tupdesc if so.
844          */
845         tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
846         if (tupleDesc == NULL)
847                 return value;                   /* not a composite type */
848
849         att = tupleDesc->attrs;
850         numAttrs = tupleDesc->natts;
851
852         /*
853          * Break down the tuple into fields.
854          */
855         olddata = DatumGetHeapTupleHeader(value);
856         Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
857         Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
858         /* Build a temporary HeapTuple control structure */
859         tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
860         ItemPointerSetInvalid(&(tmptup.t_self));
861         tmptup.t_tableOid = InvalidOid;
862         tmptup.t_data = olddata;
863
864         Assert(numAttrs <= MaxTupleAttributeNumber);
865         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
866
867         memset(toast_free, 0, numAttrs * sizeof(bool));
868
869         for (i = 0; i < numAttrs; i++)
870         {
871                 /*
872                  * Look at non-null varlena attributes
873                  */
874                 if (toast_isnull[i])
875                         has_nulls = true;
876                 else if (att[i]->attlen == -1)
877                 {
878                         varattrib  *new_value;
879
880                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
881                         if (VARATT_IS_EXTENDED(new_value))
882                         {
883                                 new_value = heap_tuple_untoast_attr(new_value);
884                                 toast_values[i] = PointerGetDatum(new_value);
885                                 toast_free[i] = true;
886                                 need_change = true;
887                         }
888                 }
889         }
890
891         /*
892          * If nothing to untoast, just return the original tuple.
893          */
894         if (!need_change)
895         {
896                 ReleaseTupleDesc(tupleDesc);
897                 return value;
898         }
899
900         /*
901          * Calculate the new size of the tuple.  Header size should not change,
902          * but data size might.
903          */
904         new_len = offsetof(HeapTupleHeaderData, t_bits);
905         if (has_nulls)
906                 new_len += BITMAPLEN(numAttrs);
907         if (olddata->t_infomask & HEAP_HASOID)
908                 new_len += sizeof(Oid);
909         new_len = MAXALIGN(new_len);
910         Assert(new_len == olddata->t_hoff);
911         new_len += heap_compute_data_size(tupleDesc, toast_values, toast_isnull);
912
913         new_data = (HeapTupleHeader) palloc0(new_len);
914
915         /*
916          * Put the tuple header and the changed values into place
917          */
918         memcpy(new_data, olddata, olddata->t_hoff);
919
920         HeapTupleHeaderSetDatumLength(new_data, new_len);
921
922         heap_fill_tuple(tupleDesc,
923                                         toast_values,
924                                         toast_isnull,
925                                         (char *) new_data + olddata->t_hoff,
926                                         &(new_data->t_infomask),
927                                         has_nulls ? new_data->t_bits : NULL);
928
929         /*
930          * Free allocated temp values
931          */
932         for (i = 0; i < numAttrs; i++)
933                 if (toast_free[i])
934                         pfree(DatumGetPointer(toast_values[i]));
935         ReleaseTupleDesc(tupleDesc);
936
937         return PointerGetDatum(new_data);
938 }
939
940
941 /* ----------
942  * toast_compress_datum -
943  *
944  *      Create a compressed version of a varlena datum
945  *
946  *      If we fail (ie, compressed result is actually bigger than original)
947  *      then return NULL.  We must not use compressed data if it'd expand
948  *      the tuple!
949  * ----------
950  */
951 Datum
952 toast_compress_datum(Datum value)
953 {
954         varattrib  *tmp;
955
956         tmp = (varattrib *) palloc(sizeof(PGLZ_Header) + VARATT_SIZE(value));
957         pglz_compress(VARATT_DATA(value), VARATT_SIZE(value) - VARHDRSZ,
958                                   (PGLZ_Header *) tmp,
959                                   PGLZ_strategy_default);
960         if (VARATT_SIZE(tmp) < VARATT_SIZE(value))
961         {
962                 /* successful compression */
963                 VARATT_SIZEP(tmp) |= VARATT_FLAG_COMPRESSED;
964                 return PointerGetDatum(tmp);
965         }
966         else
967         {
968                 /* incompressible data */
969                 pfree(tmp);
970                 return PointerGetDatum(NULL);
971         }
972 }
973
974
975 /* ----------
976  * toast_save_datum -
977  *
978  *      Save one single datum into the secondary relation and return
979  *      a varattrib reference for it.
980  * ----------
981  */
982 static Datum
983 toast_save_datum(Relation rel, Datum value)
984 {
985         Relation        toastrel;
986         Relation        toastidx;
987         HeapTuple       toasttup;
988         TupleDesc       toasttupDesc;
989         Datum           t_values[3];
990         bool            t_isnull[3];
991         varattrib  *result;
992         struct
993         {
994                 struct varlena hdr;
995                 char            data[TOAST_MAX_CHUNK_SIZE];
996         }                       chunk_data;
997         int32           chunk_size;
998         int32           chunk_seq = 0;
999         char       *data_p;
1000         int32           data_todo;
1001
1002         /*
1003          * Open the toast relation and its index.  We can use the index to check
1004          * uniqueness of the OID we assign to the toasted item, even though it has
1005          * additional columns besides OID.
1006          */
1007         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1008         toasttupDesc = toastrel->rd_att;
1009         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1010
1011         /*
1012          * Create the varattrib reference
1013          */
1014         result = (varattrib *) palloc(sizeof(varattrib));
1015
1016         result->va_header = sizeof(varattrib) | VARATT_FLAG_EXTERNAL;
1017         if (VARATT_IS_COMPRESSED(value))
1018         {
1019                 result->va_header |= VARATT_FLAG_COMPRESSED;
1020                 result->va_content.va_external.va_rawsize =
1021                         ((varattrib *) value)->va_content.va_compressed.va_rawsize;
1022         }
1023         else
1024                 result->va_content.va_external.va_rawsize = VARATT_SIZE(value);
1025
1026         result->va_content.va_external.va_extsize =
1027                 VARATT_SIZE(value) - VARHDRSZ;
1028         result->va_content.va_external.va_valueid =
1029                 GetNewOidWithIndex(toastrel, toastidx);
1030         result->va_content.va_external.va_toastrelid =
1031                 rel->rd_rel->reltoastrelid;
1032
1033         /*
1034          * Initialize constant parts of the tuple data
1035          */
1036         t_values[0] = ObjectIdGetDatum(result->va_content.va_external.va_valueid);
1037         t_values[2] = PointerGetDatum(&chunk_data);
1038         t_isnull[0] = false;
1039         t_isnull[1] = false;
1040         t_isnull[2] = false;
1041
1042         /*
1043          * Get the data to process
1044          */
1045         data_p = VARATT_DATA(value);
1046         data_todo = VARATT_SIZE(value) - VARHDRSZ;
1047
1048         /*
1049          * We must explicitly lock the toast index because we aren't using an
1050          * index scan here.
1051          */
1052         LockRelation(toastidx, RowExclusiveLock);
1053
1054         /*
1055          * Split up the item into chunks
1056          */
1057         while (data_todo > 0)
1058         {
1059                 /*
1060                  * Calculate the size of this chunk
1061                  */
1062                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1063
1064                 /*
1065                  * Build a tuple and store it
1066                  */
1067                 t_values[1] = Int32GetDatum(chunk_seq++);
1068                 VARATT_SIZEP(&chunk_data) = chunk_size + VARHDRSZ;
1069                 memcpy(VARATT_DATA(&chunk_data), data_p, chunk_size);
1070                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1071                 if (!HeapTupleIsValid(toasttup))
1072                         elog(ERROR, "failed to build TOAST tuple");
1073
1074                 simple_heap_insert(toastrel, toasttup);
1075
1076                 /*
1077                  * Create the index entry.      We cheat a little here by not using
1078                  * FormIndexDatum: this relies on the knowledge that the index columns
1079                  * are the same as the initial columns of the table.
1080                  *
1081                  * Note also that there had better not be any user-created index on
1082                  * the TOAST table, since we don't bother to update anything else.
1083                  */
1084                 index_insert(toastidx, t_values, t_isnull,
1085                                          &(toasttup->t_self),
1086                                          toastrel, toastidx->rd_index->indisunique);
1087
1088                 /*
1089                  * Free memory
1090                  */
1091                 heap_freetuple(toasttup);
1092
1093                 /*
1094                  * Move on to next chunk
1095                  */
1096                 data_todo -= chunk_size;
1097                 data_p += chunk_size;
1098         }
1099
1100         /*
1101          * Done - close toast relation and return the reference
1102          */
1103         UnlockRelation(toastidx, RowExclusiveLock);
1104         index_close(toastidx);
1105         heap_close(toastrel, RowExclusiveLock);
1106
1107         return PointerGetDatum(result);
1108 }
1109
1110
1111 /* ----------
1112  * toast_delete_datum -
1113  *
1114  *      Delete a single external stored value.
1115  * ----------
1116  */
1117 static void
1118 toast_delete_datum(Relation rel, Datum value)
1119 {
1120         varattrib  *attr = (varattrib *) DatumGetPointer(value);
1121         Relation        toastrel;
1122         Relation        toastidx;
1123         ScanKeyData toastkey;
1124         IndexScanDesc toastscan;
1125         HeapTuple       toasttup;
1126
1127         if (!VARATT_IS_EXTERNAL(attr))
1128                 return;
1129
1130         /*
1131          * Open the toast relation and it's index
1132          */
1133         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1134                                                  RowExclusiveLock);
1135         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1136
1137         /*
1138          * Setup a scan key to fetch from the index by va_valueid (we don't
1139          * particularly care whether we see them in sequence or not)
1140          */
1141         ScanKeyInit(&toastkey,
1142                                 (AttrNumber) 1,
1143                                 BTEqualStrategyNumber, F_OIDEQ,
1144                                 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1145
1146         /*
1147          * Find the chunks by index
1148          */
1149         toastscan = index_beginscan(toastrel, toastidx, true,
1150                                                                 SnapshotToast, 1, &toastkey);
1151         while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1152         {
1153                 /*
1154                  * Have a chunk, delete it
1155                  */
1156                 simple_heap_delete(toastrel, &toasttup->t_self);
1157         }
1158
1159         /*
1160          * End scan and close relations
1161          */
1162         index_endscan(toastscan);
1163         index_close(toastidx);
1164         heap_close(toastrel, RowExclusiveLock);
1165 }
1166
1167
1168 /* ----------
1169  * toast_fetch_datum -
1170  *
1171  *      Reconstruct an in memory varattrib from the chunks saved
1172  *      in the toast relation
1173  * ----------
1174  */
1175 static varattrib *
1176 toast_fetch_datum(varattrib *attr)
1177 {
1178         Relation        toastrel;
1179         Relation        toastidx;
1180         ScanKeyData toastkey;
1181         IndexScanDesc toastscan;
1182         HeapTuple       ttup;
1183         TupleDesc       toasttupDesc;
1184         varattrib  *result;
1185         int32           ressize;
1186         int32           residx,
1187                                 nextidx;
1188         int32           numchunks;
1189         Pointer         chunk;
1190         bool            isnull;
1191         int32           chunksize;
1192
1193         ressize = attr->va_content.va_external.va_extsize;
1194         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1195
1196         result = (varattrib *) palloc(ressize + VARHDRSZ);
1197         VARATT_SIZEP(result) = ressize + VARHDRSZ;
1198         if (VARATT_IS_COMPRESSED(attr))
1199                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1200
1201         /*
1202          * Open the toast relation and its index
1203          */
1204         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1205                                                  AccessShareLock);
1206         toasttupDesc = toastrel->rd_att;
1207         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1208
1209         /*
1210          * Setup a scan key to fetch from the index by va_valueid
1211          */
1212         ScanKeyInit(&toastkey,
1213                                 (AttrNumber) 1,
1214                                 BTEqualStrategyNumber, F_OIDEQ,
1215                                 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1216
1217         /*
1218          * Read the chunks by index
1219          *
1220          * Note that because the index is actually on (valueid, chunkidx) we will
1221          * see the chunks in chunkidx order, even though we didn't explicitly ask
1222          * for it.
1223          */
1224         nextidx = 0;
1225
1226         toastscan = index_beginscan(toastrel, toastidx, true,
1227                                                                 SnapshotToast, 1, &toastkey);
1228         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1229         {
1230                 /*
1231                  * Have a chunk, extract the sequence number and the data
1232                  */
1233                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1234                 Assert(!isnull);
1235                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1236                 Assert(!isnull);
1237                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1238
1239                 /*
1240                  * Some checks on the data we've found
1241                  */
1242                 if (residx != nextidx)
1243                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1244                                  residx, nextidx,
1245                                  attr->va_content.va_external.va_valueid);
1246                 if (residx < numchunks - 1)
1247                 {
1248                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1249                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1250                                          chunksize, residx,
1251                                          attr->va_content.va_external.va_valueid);
1252                 }
1253                 else if (residx < numchunks)
1254                 {
1255                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1256                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1257                                          chunksize, residx,
1258                                          attr->va_content.va_external.va_valueid);
1259                 }
1260                 else
1261                         elog(ERROR, "unexpected chunk number %d for toast value %u",
1262                                  residx,
1263                                  attr->va_content.va_external.va_valueid);
1264
1265                 /*
1266                  * Copy the data into proper place in our result
1267                  */
1268                 memcpy(((char *) VARATT_DATA(result)) + residx * TOAST_MAX_CHUNK_SIZE,
1269                            VARATT_DATA(chunk),
1270                            chunksize);
1271
1272                 nextidx++;
1273         }
1274
1275         /*
1276          * Final checks that we successfully fetched the datum
1277          */
1278         if (nextidx != numchunks)
1279                 elog(ERROR, "missing chunk number %d for toast value %u",
1280                          nextidx,
1281                          attr->va_content.va_external.va_valueid);
1282
1283         /*
1284          * End scan and close relations
1285          */
1286         index_endscan(toastscan);
1287         index_close(toastidx);
1288         heap_close(toastrel, AccessShareLock);
1289
1290         return result;
1291 }
1292
1293 /* ----------
1294  * toast_fetch_datum_slice -
1295  *
1296  *      Reconstruct a segment of a varattrib from the chunks saved
1297  *      in the toast relation
1298  * ----------
1299  */
1300 static varattrib *
1301 toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
1302 {
1303         Relation        toastrel;
1304         Relation        toastidx;
1305         ScanKeyData toastkey[3];
1306         int                     nscankeys;
1307         IndexScanDesc toastscan;
1308         HeapTuple       ttup;
1309         TupleDesc       toasttupDesc;
1310         varattrib  *result;
1311         int32           attrsize;
1312         int32           residx;
1313         int32           nextidx;
1314         int                     numchunks;
1315         int                     startchunk;
1316         int                     endchunk;
1317         int32           startoffset;
1318         int32           endoffset;
1319         int                     totalchunks;
1320         Pointer         chunk;
1321         bool            isnull;
1322         int32           chunksize;
1323         int32           chcpystrt;
1324         int32           chcpyend;
1325
1326         attrsize = attr->va_content.va_external.va_extsize;
1327         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1328
1329         if (sliceoffset >= attrsize)
1330         {
1331                 sliceoffset = 0;
1332                 length = 0;
1333         }
1334
1335         if (((sliceoffset + length) > attrsize) || length < 0)
1336                 length = attrsize - sliceoffset;
1337
1338         result = (varattrib *) palloc(length + VARHDRSZ);
1339         VARATT_SIZEP(result) = length + VARHDRSZ;
1340
1341         if (VARATT_IS_COMPRESSED(attr))
1342                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1343
1344         if (length == 0)
1345                 return result;          /* Can save a lot of work at this point! */
1346
1347         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1348         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1349         numchunks = (endchunk - startchunk) + 1;
1350
1351         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1352         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1353
1354         /*
1355          * Open the toast relation and it's index
1356          */
1357         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1358                                                  AccessShareLock);
1359         toasttupDesc = toastrel->rd_att;
1360         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1361
1362         /*
1363          * Setup a scan key to fetch from the index. This is either two keys or
1364          * three depending on the number of chunks.
1365          */
1366         ScanKeyInit(&toastkey[0],
1367                                 (AttrNumber) 1,
1368                                 BTEqualStrategyNumber, F_OIDEQ,
1369                                 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1370
1371         /*
1372          * Use equality condition for one chunk, a range condition otherwise:
1373          */
1374         if (numchunks == 1)
1375         {
1376                 ScanKeyInit(&toastkey[1],
1377                                         (AttrNumber) 2,
1378                                         BTEqualStrategyNumber, F_INT4EQ,
1379                                         Int32GetDatum(startchunk));
1380                 nscankeys = 2;
1381         }
1382         else
1383         {
1384                 ScanKeyInit(&toastkey[1],
1385                                         (AttrNumber) 2,
1386                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1387                                         Int32GetDatum(startchunk));
1388                 ScanKeyInit(&toastkey[2],
1389                                         (AttrNumber) 2,
1390                                         BTLessEqualStrategyNumber, F_INT4LE,
1391                                         Int32GetDatum(endchunk));
1392                 nscankeys = 3;
1393         }
1394
1395         /*
1396          * Read the chunks by index
1397          *
1398          * The index is on (valueid, chunkidx) so they will come in order
1399          */
1400         nextidx = startchunk;
1401         toastscan = index_beginscan(toastrel, toastidx, true,
1402                                                                 SnapshotToast, nscankeys, toastkey);
1403         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1404         {
1405                 /*
1406                  * Have a chunk, extract the sequence number and the data
1407                  */
1408                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1409                 Assert(!isnull);
1410                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1411                 Assert(!isnull);
1412                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1413
1414                 /*
1415                  * Some checks on the data we've found
1416                  */
1417                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1418                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1419                                  residx, nextidx,
1420                                  attr->va_content.va_external.va_valueid);
1421                 if (residx < totalchunks - 1)
1422                 {
1423                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1424                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1425                                          chunksize, residx,
1426                                          attr->va_content.va_external.va_valueid);
1427                 }
1428                 else
1429                 {
1430                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1431                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1432                                          chunksize, residx,
1433                                          attr->va_content.va_external.va_valueid);
1434                 }
1435
1436                 /*
1437                  * Copy the data into proper place in our result
1438                  */
1439                 chcpystrt = 0;
1440                 chcpyend = chunksize - 1;
1441                 if (residx == startchunk)
1442                         chcpystrt = startoffset;
1443                 if (residx == endchunk)
1444                         chcpyend = endoffset;
1445
1446                 memcpy(((char *) VARATT_DATA(result)) +
1447                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1448                            VARATT_DATA(chunk) + chcpystrt,
1449                            (chcpyend - chcpystrt) + 1);
1450
1451                 nextidx++;
1452         }
1453
1454         /*
1455          * Final checks that we successfully fetched the datum
1456          */
1457         if (nextidx != (endchunk + 1))
1458                 elog(ERROR, "missing chunk number %d for toast value %u",
1459                          nextidx,
1460                          attr->va_content.va_external.va_valueid);
1461
1462         /*
1463          * End scan and close relations
1464          */
1465         index_endscan(toastscan);
1466         index_close(toastidx);
1467         heap_close(toastrel, AccessShareLock);
1468
1469         return result;
1470 }