]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Add support for multiple kinds of external toast datums.
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2013, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "utils/fmgroids.h"
39 #include "utils/pg_lzcompress.h"
40 #include "utils/rel.h"
41 #include "utils/typcache.h"
42 #include "utils/tqual.h"
43
44
45 #undef TOAST_DEBUG
46
47 /*
48  * Testing whether an externally-stored value is compressed now requires
49  * comparing extsize (the actual length of the external data) to rawsize
50  * (the original uncompressed datum's size).  The latter includes VARHDRSZ
51  * overhead, the former doesn't.  We never use compression unless it actually
52  * saves space, so we expect either equality or less-than.
53  */
54 #define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \
55         ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ)
56
57 /*
58  * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum
59  * into a local "struct varatt_external" toast pointer.  This should be
60  * just a memcpy, but some versions of gcc seem to produce broken code
61  * that assumes the datum contents are aligned.  Introducing an explicit
62  * intermediate "varattrib_1b_e *" variable seems to fix it.
63  */
64 #define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \
65 do { \
66         varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \
67         Assert(VARATT_IS_EXTERNAL(attre)); \
68         Assert(VARSIZE_EXTERNAL(attre) == sizeof(toast_pointer) + VARHDRSZ_EXTERNAL); \
69         memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
70 } while (0)
71
72
73 static void toast_delete_datum(Relation rel, Datum value);
74 static Datum toast_save_datum(Relation rel, Datum value,
75                                  struct varlena * oldexternal, int options);
76 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
77 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
78 static struct varlena *toast_fetch_datum(struct varlena * attr);
79 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
80                                                 int32 sliceoffset, int32 length);
81
82
83 /* ----------
84  * heap_tuple_fetch_attr -
85  *
86  *      Public entry point to get back a toasted value from
87  *      external source (possibly still in compressed format).
88  *
89  * This will return a datum that contains all the data internally, ie, not
90  * relying on external storage or memory, but it can still be compressed or
91  * have a short header.
92  ----------
93  */
94 struct varlena *
95 heap_tuple_fetch_attr(struct varlena * attr)
96 {
97         struct varlena *result;
98
99         if (VARATT_IS_EXTERNAL_ONDISK(attr))
100         {
101                 /*
102                  * This is an external stored plain value
103                  */
104                 result = toast_fetch_datum(attr);
105         }
106         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
107         {
108                 /*
109                  * copy into the caller's memory context. That's not required in all
110                  * cases but sufficient for now since this is mainly used when we need
111                  * to persist a Datum for unusually long time, like in a HOLD cursor.
112                  */
113                 struct varatt_indirect redirect;
114                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
115                 attr = (struct varlena *)redirect.pointer;
116
117                 /* nested indirect Datums aren't allowed */
118                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
119
120                 /* doesn't make much sense, but better handle it */
121                 if (VARATT_IS_EXTERNAL_ONDISK(attr))
122                         return heap_tuple_fetch_attr(attr);
123
124                 /* copy datum verbatim */
125                 result = (struct varlena *) palloc(VARSIZE_ANY(attr));
126                 memcpy(result, attr, VARSIZE_ANY(attr));
127         }
128         else
129         {
130                 /*
131                  * This is a plain value inside of the main tuple - why am I called?
132                  */
133                 result = attr;
134         }
135
136         return result;
137 }
138
139
140 /* ----------
141  * heap_tuple_untoast_attr -
142  *
143  *      Public entry point to get back a toasted value from compression
144  *      or external storage.
145  * ----------
146  */
147 struct varlena *
148 heap_tuple_untoast_attr(struct varlena * attr)
149 {
150         if (VARATT_IS_EXTERNAL_ONDISK(attr))
151         {
152                 /*
153                  * This is an externally stored datum --- fetch it back from there
154                  */
155                 attr = toast_fetch_datum(attr);
156                 /* If it's compressed, decompress it */
157                 if (VARATT_IS_COMPRESSED(attr))
158                 {
159                         PGLZ_Header *tmp = (PGLZ_Header *) attr;
160
161                         attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
162                         SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
163                         pglz_decompress(tmp, VARDATA(attr));
164                         pfree(tmp);
165                 }
166         }
167         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
168         {
169                 struct varatt_indirect redirect;
170                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
171                 attr = (struct varlena *)redirect.pointer;
172
173                 /* nested indirect Datums aren't allowed */
174                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
175
176                 attr = heap_tuple_untoast_attr(attr);
177         }
178         else if (VARATT_IS_COMPRESSED(attr))
179         {
180                 /*
181                  * This is a compressed value inside of the main tuple
182                  */
183                 PGLZ_Header *tmp = (PGLZ_Header *) attr;
184
185                 attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
186                 SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
187                 pglz_decompress(tmp, VARDATA(attr));
188         }
189         else if (VARATT_IS_SHORT(attr))
190         {
191                 /*
192                  * This is a short-header varlena --- convert to 4-byte header format
193                  */
194                 Size            data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
195                 Size            new_size = data_size + VARHDRSZ;
196                 struct varlena *new_attr;
197
198                 new_attr = (struct varlena *) palloc(new_size);
199                 SET_VARSIZE(new_attr, new_size);
200                 memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
201                 attr = new_attr;
202         }
203
204         return attr;
205 }
206
207
208 /* ----------
209  * heap_tuple_untoast_attr_slice -
210  *
211  *              Public entry point to get back part of a toasted value
212  *              from compression or external storage.
213  * ----------
214  */
215 struct varlena *
216 heap_tuple_untoast_attr_slice(struct varlena * attr,
217                                                           int32 sliceoffset, int32 slicelength)
218 {
219         struct varlena *preslice;
220         struct varlena *result;
221         char       *attrdata;
222         int32           attrsize;
223
224         if (VARATT_IS_EXTERNAL_ONDISK(attr))
225         {
226                 struct varatt_external toast_pointer;
227
228                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
229
230                 /* fast path for non-compressed external datums */
231                 if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
232                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
233
234                 /* fetch it back (compressed marker will get set automatically) */
235                 preslice = toast_fetch_datum(attr);
236         }
237         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
238         {
239                 struct varatt_indirect redirect;
240                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
241
242                 /* nested indirect Datums aren't allowed */
243                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
244
245                 return heap_tuple_untoast_attr_slice(redirect.pointer,
246                                                                                          sliceoffset, slicelength);
247         }
248         else
249                 preslice = attr;
250
251         if (VARATT_IS_COMPRESSED(preslice))
252         {
253                 PGLZ_Header *tmp = (PGLZ_Header *) preslice;
254                 Size            size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
255
256                 preslice = (struct varlena *) palloc(size);
257                 SET_VARSIZE(preslice, size);
258                 pglz_decompress(tmp, VARDATA(preslice));
259
260                 if (tmp != (PGLZ_Header *) attr)
261                         pfree(tmp);
262         }
263
264         if (VARATT_IS_SHORT(preslice))
265         {
266                 attrdata = VARDATA_SHORT(preslice);
267                 attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
268         }
269         else
270         {
271                 attrdata = VARDATA(preslice);
272                 attrsize = VARSIZE(preslice) - VARHDRSZ;
273         }
274
275         /* slicing of datum for compressed cases and plain value */
276
277         if (sliceoffset >= attrsize)
278         {
279                 sliceoffset = 0;
280                 slicelength = 0;
281         }
282
283         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
284                 slicelength = attrsize - sliceoffset;
285
286         result = (struct varlena *) palloc(slicelength + VARHDRSZ);
287         SET_VARSIZE(result, slicelength + VARHDRSZ);
288
289         memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
290
291         if (preslice != attr)
292                 pfree(preslice);
293
294         return result;
295 }
296
297
298 /* ----------
299  * toast_raw_datum_size -
300  *
301  *      Return the raw (detoasted) size of a varlena datum
302  *      (including the VARHDRSZ header)
303  * ----------
304  */
305 Size
306 toast_raw_datum_size(Datum value)
307 {
308         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
309         Size            result;
310
311         if (VARATT_IS_EXTERNAL_ONDISK(attr))
312         {
313                 /* va_rawsize is the size of the original datum -- including header */
314                 struct varatt_external toast_pointer;
315
316                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
317                 result = toast_pointer.va_rawsize;
318         }
319         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
320         {
321                 struct varatt_indirect toast_pointer;
322                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
323
324                 /* nested indirect Datums aren't allowed */
325                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
326
327                 return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
328         }
329         else if (VARATT_IS_COMPRESSED(attr))
330         {
331                 /* here, va_rawsize is just the payload size */
332                 result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
333         }
334         else if (VARATT_IS_SHORT(attr))
335         {
336                 /*
337                  * we have to normalize the header length to VARHDRSZ or else the
338                  * callers of this function will be confused.
339                  */
340                 result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
341         }
342         else
343         {
344                 /* plain untoasted datum */
345                 result = VARSIZE(attr);
346         }
347         return result;
348 }
349
350 /* ----------
351  * toast_datum_size
352  *
353  *      Return the physical storage size (possibly compressed) of a varlena datum
354  * ----------
355  */
356 Size
357 toast_datum_size(Datum value)
358 {
359         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
360         Size            result;
361
362         if (VARATT_IS_EXTERNAL_ONDISK(attr))
363         {
364                 /*
365                  * Attribute is stored externally - return the extsize whether
366                  * compressed or not.  We do not count the size of the toast pointer
367                  * ... should we?
368                  */
369                 struct varatt_external toast_pointer;
370
371                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
372                 result = toast_pointer.va_extsize;
373         }
374         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
375         {
376                 struct varatt_indirect toast_pointer;
377                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
378
379                 /* nested indirect Datums aren't allowed */
380                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
381
382                 return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
383         }
384         else if (VARATT_IS_SHORT(attr))
385         {
386                 result = VARSIZE_SHORT(attr);
387         }
388         else
389         {
390                 /*
391                  * Attribute is stored inline either compressed or not, just calculate
392                  * the size of the datum in either case.
393                  */
394                 result = VARSIZE(attr);
395         }
396         return result;
397 }
398
399
400 /* ----------
401  * toast_delete -
402  *
403  *      Cascaded delete toast-entries on DELETE
404  * ----------
405  */
406 void
407 toast_delete(Relation rel, HeapTuple oldtup)
408 {
409         TupleDesc       tupleDesc;
410         Form_pg_attribute *att;
411         int                     numAttrs;
412         int                     i;
413         Datum           toast_values[MaxHeapAttributeNumber];
414         bool            toast_isnull[MaxHeapAttributeNumber];
415
416         /*
417          * We should only ever be called for tuples of plain relations or
418          * materialized views --- recursing on a toast rel is bad news.
419          */
420         Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
421                    rel->rd_rel->relkind == RELKIND_MATVIEW);
422
423         /*
424          * Get the tuple descriptor and break down the tuple into fields.
425          *
426          * NOTE: it's debatable whether to use heap_deform_tuple() here or just
427          * heap_getattr() only the varlena columns.  The latter could win if there
428          * are few varlena columns and many non-varlena ones. However,
429          * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
430          * O(N^2) if there are many varlena columns, so it seems better to err on
431          * the side of linear cost.  (We won't even be here unless there's at
432          * least one varlena column, by the way.)
433          */
434         tupleDesc = rel->rd_att;
435         att = tupleDesc->attrs;
436         numAttrs = tupleDesc->natts;
437
438         Assert(numAttrs <= MaxHeapAttributeNumber);
439         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
440
441         /*
442          * Check for external stored attributes and delete them from the secondary
443          * relation.
444          */
445         for (i = 0; i < numAttrs; i++)
446         {
447                 if (att[i]->attlen == -1)
448                 {
449                         Datum           value = toast_values[i];
450
451                         if (toast_isnull[i])
452                                 continue;
453                         else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
454                                 toast_delete_datum(rel, value);
455                         else if (VARATT_IS_EXTERNAL_INDIRECT(PointerGetDatum(value)))
456                                 elog(ERROR, "attempt to delete tuple containing indirect datums");
457                 }
458         }
459 }
460
461
462 /* ----------
463  * toast_insert_or_update -
464  *
465  *      Delete no-longer-used toast-entries and create new ones to
466  *      make the new tuple fit on INSERT or UPDATE
467  *
468  * Inputs:
469  *      newtup: the candidate new tuple to be inserted
470  *      oldtup: the old row version for UPDATE, or NULL for INSERT
471  *      options: options to be passed to heap_insert() for toast rows
472  * Result:
473  *      either newtup if no toasting is needed, or a palloc'd modified tuple
474  *      that is what should actually get stored
475  *
476  * NOTE: neither newtup nor oldtup will be modified.  This is a change
477  * from the pre-8.1 API of this routine.
478  * ----------
479  */
480 HeapTuple
481 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
482                                            int options)
483 {
484         HeapTuple       result_tuple;
485         TupleDesc       tupleDesc;
486         Form_pg_attribute *att;
487         int                     numAttrs;
488         int                     i;
489
490         bool            need_change = false;
491         bool            need_free = false;
492         bool            need_delold = false;
493         bool            has_nulls = false;
494
495         Size            maxDataLen;
496         Size            hoff;
497
498         char            toast_action[MaxHeapAttributeNumber];
499         bool            toast_isnull[MaxHeapAttributeNumber];
500         bool            toast_oldisnull[MaxHeapAttributeNumber];
501         Datum           toast_values[MaxHeapAttributeNumber];
502         Datum           toast_oldvalues[MaxHeapAttributeNumber];
503         struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
504         int32           toast_sizes[MaxHeapAttributeNumber];
505         bool            toast_free[MaxHeapAttributeNumber];
506         bool            toast_delold[MaxHeapAttributeNumber];
507
508         /*
509          * We should only ever be called for tuples of plain relations ---
510          * recursing on a toast rel is bad news.
511          */
512         Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
513                    rel->rd_rel->relkind == RELKIND_MATVIEW);
514
515         /*
516          * Get the tuple descriptor and break down the tuple(s) into fields.
517          */
518         tupleDesc = rel->rd_att;
519         att = tupleDesc->attrs;
520         numAttrs = tupleDesc->natts;
521
522         Assert(numAttrs <= MaxHeapAttributeNumber);
523         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
524         if (oldtup != NULL)
525                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
526
527         /* ----------
528          * Then collect information about the values given
529          *
530          * NOTE: toast_action[i] can have these values:
531          *              ' '             default handling
532          *              'p'             already processed --- don't touch it
533          *              'x'             incompressible, but OK to move off
534          *
535          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
536          *              toast_action[i] different from 'p'.
537          * ----------
538          */
539         memset(toast_action, ' ', numAttrs * sizeof(char));
540         memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
541         memset(toast_free, 0, numAttrs * sizeof(bool));
542         memset(toast_delold, 0, numAttrs * sizeof(bool));
543
544         for (i = 0; i < numAttrs; i++)
545         {
546                 struct varlena *old_value;
547                 struct varlena *new_value;
548
549                 if (oldtup != NULL)
550                 {
551                         /*
552                          * For UPDATE get the old and new values of this attribute
553                          */
554                         old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
555                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
556
557                         /*
558                          * If the old value is stored on disk, check if it has changed so
559                          * we have to delete it later.
560                          */
561                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
562                                 VARATT_IS_EXTERNAL_ONDISK(old_value))
563                         {
564                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
565                                         memcmp((char *) old_value, (char *) new_value,
566                                                    VARSIZE_EXTERNAL(old_value)) != 0)
567                                 {
568                                         /*
569                                          * The old external stored value isn't needed any more
570                                          * after the update
571                                          */
572                                         toast_delold[i] = true;
573                                         need_delold = true;
574                                 }
575                                 else
576                                 {
577                                         /*
578                                          * This attribute isn't changed by this update so we reuse
579                                          * the original reference to the old value in the new
580                                          * tuple.
581                                          */
582                                         toast_action[i] = 'p';
583                                         continue;
584                                 }
585                         }
586                 }
587                 else
588                 {
589                         /*
590                          * For INSERT simply get the new value
591                          */
592                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
593                 }
594
595                 /*
596                  * Handle NULL attributes
597                  */
598                 if (toast_isnull[i])
599                 {
600                         toast_action[i] = 'p';
601                         has_nulls = true;
602                         continue;
603                 }
604
605                 /*
606                  * Now look at varlena attributes
607                  */
608                 if (att[i]->attlen == -1)
609                 {
610                         /*
611                          * If the table's attribute says PLAIN always, force it so.
612                          */
613                         if (att[i]->attstorage == 'p')
614                                 toast_action[i] = 'p';
615
616                         /*
617                          * We took care of UPDATE above, so any external value we find
618                          * still in the tuple must be someone else's we cannot reuse.
619                          * Fetch it back (without decompression, unless we are forcing
620                          * PLAIN storage).      If necessary, we'll push it out as a new
621                          * external value below.
622                          */
623                         if (VARATT_IS_EXTERNAL(new_value))
624                         {
625                                 toast_oldexternal[i] = new_value;
626                                 if (att[i]->attstorage == 'p')
627                                         new_value = heap_tuple_untoast_attr(new_value);
628                                 else
629                                         new_value = heap_tuple_fetch_attr(new_value);
630                                 toast_values[i] = PointerGetDatum(new_value);
631                                 toast_free[i] = true;
632                                 need_change = true;
633                                 need_free = true;
634                         }
635
636                         /*
637                          * Remember the size of this attribute
638                          */
639                         toast_sizes[i] = VARSIZE_ANY(new_value);
640                 }
641                 else
642                 {
643                         /*
644                          * Not a varlena attribute, plain storage always
645                          */
646                         toast_action[i] = 'p';
647                 }
648         }
649
650         /* ----------
651          * Compress and/or save external until data fits into target length
652          *
653          *      1: Inline compress attributes with attstorage 'x', and store very
654          *         large attributes with attstorage 'x' or 'e' external immediately
655          *      2: Store attributes with attstorage 'x' or 'e' external
656          *      3: Inline compress attributes with attstorage 'm'
657          *      4: Store attributes with attstorage 'm' external
658          * ----------
659          */
660
661         /* compute header overhead --- this should match heap_form_tuple() */
662         hoff = offsetof(HeapTupleHeaderData, t_bits);
663         if (has_nulls)
664                 hoff += BITMAPLEN(numAttrs);
665         if (newtup->t_data->t_infomask & HEAP_HASOID)
666                 hoff += sizeof(Oid);
667         hoff = MAXALIGN(hoff);
668         /* now convert to a limit on the tuple data size */
669         maxDataLen = TOAST_TUPLE_TARGET - hoff;
670
671         /*
672          * Look for attributes with attstorage 'x' to compress.  Also find large
673          * attributes with attstorage 'x' or 'e', and store them external.
674          */
675         while (heap_compute_data_size(tupleDesc,
676                                                                   toast_values, toast_isnull) > maxDataLen)
677         {
678                 int                     biggest_attno = -1;
679                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
680                 Datum           old_value;
681                 Datum           new_value;
682
683                 /*
684                  * Search for the biggest yet unprocessed internal attribute
685                  */
686                 for (i = 0; i < numAttrs; i++)
687                 {
688                         if (toast_action[i] != ' ')
689                                 continue;
690                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
691                                 continue;               /* can't happen, toast_action would be 'p' */
692                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
693                                 continue;
694                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
695                                 continue;
696                         if (toast_sizes[i] > biggest_size)
697                         {
698                                 biggest_attno = i;
699                                 biggest_size = toast_sizes[i];
700                         }
701                 }
702
703                 if (biggest_attno < 0)
704                         break;
705
706                 /*
707                  * Attempt to compress it inline, if it has attstorage 'x'
708                  */
709                 i = biggest_attno;
710                 if (att[i]->attstorage == 'x')
711                 {
712                         old_value = toast_values[i];
713                         new_value = toast_compress_datum(old_value);
714
715                         if (DatumGetPointer(new_value) != NULL)
716                         {
717                                 /* successful compression */
718                                 if (toast_free[i])
719                                         pfree(DatumGetPointer(old_value));
720                                 toast_values[i] = new_value;
721                                 toast_free[i] = true;
722                                 toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
723                                 need_change = true;
724                                 need_free = true;
725                         }
726                         else
727                         {
728                                 /* incompressible, ignore on subsequent compression passes */
729                                 toast_action[i] = 'x';
730                         }
731                 }
732                 else
733                 {
734                         /* has attstorage 'e', ignore on subsequent compression passes */
735                         toast_action[i] = 'x';
736                 }
737
738                 /*
739                  * If this value is by itself more than maxDataLen (after compression
740                  * if any), push it out to the toast table immediately, if possible.
741                  * This avoids uselessly compressing other fields in the common case
742                  * where we have one long field and several short ones.
743                  *
744                  * XXX maybe the threshold should be less than maxDataLen?
745                  */
746                 if (toast_sizes[i] > maxDataLen &&
747                         rel->rd_rel->reltoastrelid != InvalidOid)
748                 {
749                         old_value = toast_values[i];
750                         toast_action[i] = 'p';
751                         toast_values[i] = toast_save_datum(rel, toast_values[i],
752                                                                                            toast_oldexternal[i], options);
753                         if (toast_free[i])
754                                 pfree(DatumGetPointer(old_value));
755                         toast_free[i] = true;
756                         need_change = true;
757                         need_free = true;
758                 }
759         }
760
761         /*
762          * Second we look for attributes of attstorage 'x' or 'e' that are still
763          * inline.      But skip this if there's no toast table to push them to.
764          */
765         while (heap_compute_data_size(tupleDesc,
766                                                                   toast_values, toast_isnull) > maxDataLen &&
767                    rel->rd_rel->reltoastrelid != InvalidOid)
768         {
769                 int                     biggest_attno = -1;
770                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
771                 Datum           old_value;
772
773                 /*------
774                  * Search for the biggest yet inlined attribute with
775                  * attstorage equals 'x' or 'e'
776                  *------
777                  */
778                 for (i = 0; i < numAttrs; i++)
779                 {
780                         if (toast_action[i] == 'p')
781                                 continue;
782                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
783                                 continue;               /* can't happen, toast_action would be 'p' */
784                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
785                                 continue;
786                         if (toast_sizes[i] > biggest_size)
787                         {
788                                 biggest_attno = i;
789                                 biggest_size = toast_sizes[i];
790                         }
791                 }
792
793                 if (biggest_attno < 0)
794                         break;
795
796                 /*
797                  * Store this external
798                  */
799                 i = biggest_attno;
800                 old_value = toast_values[i];
801                 toast_action[i] = 'p';
802                 toast_values[i] = toast_save_datum(rel, toast_values[i],
803                                                                                    toast_oldexternal[i], options);
804                 if (toast_free[i])
805                         pfree(DatumGetPointer(old_value));
806                 toast_free[i] = true;
807
808                 need_change = true;
809                 need_free = true;
810         }
811
812         /*
813          * Round 3 - this time we take attributes with storage 'm' into
814          * compression
815          */
816         while (heap_compute_data_size(tupleDesc,
817                                                                   toast_values, toast_isnull) > maxDataLen)
818         {
819                 int                     biggest_attno = -1;
820                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
821                 Datum           old_value;
822                 Datum           new_value;
823
824                 /*
825                  * Search for the biggest yet uncompressed internal attribute
826                  */
827                 for (i = 0; i < numAttrs; i++)
828                 {
829                         if (toast_action[i] != ' ')
830                                 continue;
831                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
832                                 continue;               /* can't happen, toast_action would be 'p' */
833                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
834                                 continue;
835                         if (att[i]->attstorage != 'm')
836                                 continue;
837                         if (toast_sizes[i] > biggest_size)
838                         {
839                                 biggest_attno = i;
840                                 biggest_size = toast_sizes[i];
841                         }
842                 }
843
844                 if (biggest_attno < 0)
845                         break;
846
847                 /*
848                  * Attempt to compress it inline
849                  */
850                 i = biggest_attno;
851                 old_value = toast_values[i];
852                 new_value = toast_compress_datum(old_value);
853
854                 if (DatumGetPointer(new_value) != NULL)
855                 {
856                         /* successful compression */
857                         if (toast_free[i])
858                                 pfree(DatumGetPointer(old_value));
859                         toast_values[i] = new_value;
860                         toast_free[i] = true;
861                         toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
862                         need_change = true;
863                         need_free = true;
864                 }
865                 else
866                 {
867                         /* incompressible, ignore on subsequent compression passes */
868                         toast_action[i] = 'x';
869                 }
870         }
871
872         /*
873          * Finally we store attributes of type 'm' externally.  At this point we
874          * increase the target tuple size, so that 'm' attributes aren't stored
875          * externally unless really necessary.
876          */
877         maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
878
879         while (heap_compute_data_size(tupleDesc,
880                                                                   toast_values, toast_isnull) > maxDataLen &&
881                    rel->rd_rel->reltoastrelid != InvalidOid)
882         {
883                 int                     biggest_attno = -1;
884                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
885                 Datum           old_value;
886
887                 /*--------
888                  * Search for the biggest yet inlined attribute with
889                  * attstorage = 'm'
890                  *--------
891                  */
892                 for (i = 0; i < numAttrs; i++)
893                 {
894                         if (toast_action[i] == 'p')
895                                 continue;
896                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
897                                 continue;               /* can't happen, toast_action would be 'p' */
898                         if (att[i]->attstorage != 'm')
899                                 continue;
900                         if (toast_sizes[i] > biggest_size)
901                         {
902                                 biggest_attno = i;
903                                 biggest_size = toast_sizes[i];
904                         }
905                 }
906
907                 if (biggest_attno < 0)
908                         break;
909
910                 /*
911                  * Store this external
912                  */
913                 i = biggest_attno;
914                 old_value = toast_values[i];
915                 toast_action[i] = 'p';
916                 toast_values[i] = toast_save_datum(rel, toast_values[i],
917                                                                                    toast_oldexternal[i], options);
918                 if (toast_free[i])
919                         pfree(DatumGetPointer(old_value));
920                 toast_free[i] = true;
921
922                 need_change = true;
923                 need_free = true;
924         }
925
926         /*
927          * In the case we toasted any values, we need to build a new heap tuple
928          * with the changed values.
929          */
930         if (need_change)
931         {
932                 HeapTupleHeader olddata = newtup->t_data;
933                 HeapTupleHeader new_data;
934                 int32           new_header_len;
935                 int32           new_data_len;
936                 int32           new_tuple_len;
937
938                 /*
939                  * Calculate the new size of the tuple.
940                  *
941                  * Note: we used to assume here that the old tuple's t_hoff must equal
942                  * the new_header_len value, but that was incorrect.  The old tuple
943                  * might have a smaller-than-current natts, if there's been an ALTER
944                  * TABLE ADD COLUMN since it was stored; and that would lead to a
945                  * different conclusion about the size of the null bitmap, or even
946                  * whether there needs to be one at all.
947                  */
948                 new_header_len = offsetof(HeapTupleHeaderData, t_bits);
949                 if (has_nulls)
950                         new_header_len += BITMAPLEN(numAttrs);
951                 if (olddata->t_infomask & HEAP_HASOID)
952                         new_header_len += sizeof(Oid);
953                 new_header_len = MAXALIGN(new_header_len);
954                 new_data_len = heap_compute_data_size(tupleDesc,
955                                                                                           toast_values, toast_isnull);
956                 new_tuple_len = new_header_len + new_data_len;
957
958                 /*
959                  * Allocate and zero the space needed, and fill HeapTupleData fields.
960                  */
961                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
962                 result_tuple->t_len = new_tuple_len;
963                 result_tuple->t_self = newtup->t_self;
964                 result_tuple->t_tableOid = newtup->t_tableOid;
965                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
966                 result_tuple->t_data = new_data;
967
968                 /*
969                  * Copy the existing tuple header, but adjust natts and t_hoff.
970                  */
971                 memcpy(new_data, olddata, offsetof(HeapTupleHeaderData, t_bits));
972                 HeapTupleHeaderSetNatts(new_data, numAttrs);
973                 new_data->t_hoff = new_header_len;
974                 if (olddata->t_infomask & HEAP_HASOID)
975                         HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
976
977                 /* Copy over the data, and fill the null bitmap if needed */
978                 heap_fill_tuple(tupleDesc,
979                                                 toast_values,
980                                                 toast_isnull,
981                                                 (char *) new_data + new_header_len,
982                                                 new_data_len,
983                                                 &(new_data->t_infomask),
984                                                 has_nulls ? new_data->t_bits : NULL);
985         }
986         else
987                 result_tuple = newtup;
988
989         /*
990          * Free allocated temp values
991          */
992         if (need_free)
993                 for (i = 0; i < numAttrs; i++)
994                         if (toast_free[i])
995                                 pfree(DatumGetPointer(toast_values[i]));
996
997         /*
998          * Delete external values from the old tuple
999          */
1000         if (need_delold)
1001                 for (i = 0; i < numAttrs; i++)
1002                         if (toast_delold[i])
1003                                 toast_delete_datum(rel, toast_oldvalues[i]);
1004
1005         return result_tuple;
1006 }
1007
1008
1009 /* ----------
1010  * toast_flatten_tuple -
1011  *
1012  *      "Flatten" a tuple to contain no out-of-line toasted fields.
1013  *      (This does not eliminate compressed or short-header datums.)
1014  * ----------
1015  */
1016 HeapTuple
1017 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
1018 {
1019         HeapTuple       new_tuple;
1020         Form_pg_attribute *att = tupleDesc->attrs;
1021         int                     numAttrs = tupleDesc->natts;
1022         int                     i;
1023         Datum           toast_values[MaxTupleAttributeNumber];
1024         bool            toast_isnull[MaxTupleAttributeNumber];
1025         bool            toast_free[MaxTupleAttributeNumber];
1026
1027         /*
1028          * Break down the tuple into fields.
1029          */
1030         Assert(numAttrs <= MaxTupleAttributeNumber);
1031         heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
1032
1033         memset(toast_free, 0, numAttrs * sizeof(bool));
1034
1035         for (i = 0; i < numAttrs; i++)
1036         {
1037                 /*
1038                  * Look at non-null varlena attributes
1039                  */
1040                 if (!toast_isnull[i] && att[i]->attlen == -1)
1041                 {
1042                         struct varlena *new_value;
1043
1044                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1045                         if (VARATT_IS_EXTERNAL(new_value))
1046                         {
1047                                 new_value = toast_fetch_datum(new_value);
1048                                 toast_values[i] = PointerGetDatum(new_value);
1049                                 toast_free[i] = true;
1050                         }
1051                 }
1052         }
1053
1054         /*
1055          * Form the reconfigured tuple.
1056          */
1057         new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
1058
1059         /*
1060          * Be sure to copy the tuple's OID and identity fields.  We also make a
1061          * point of copying visibility info, just in case anybody looks at those
1062          * fields in a syscache entry.
1063          */
1064         if (tupleDesc->tdhasoid)
1065                 HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
1066
1067         new_tuple->t_self = tup->t_self;
1068         new_tuple->t_tableOid = tup->t_tableOid;
1069
1070         new_tuple->t_data->t_choice = tup->t_data->t_choice;
1071         new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
1072         new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
1073         new_tuple->t_data->t_infomask |=
1074                 tup->t_data->t_infomask & HEAP_XACT_MASK;
1075         new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
1076         new_tuple->t_data->t_infomask2 |=
1077                 tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1078
1079         /*
1080          * Free allocated temp values
1081          */
1082         for (i = 0; i < numAttrs; i++)
1083                 if (toast_free[i])
1084                         pfree(DatumGetPointer(toast_values[i]));
1085
1086         return new_tuple;
1087 }
1088
1089
1090 /* ----------
1091  * toast_flatten_tuple_attribute -
1092  *
1093  *      If a Datum is of composite type, "flatten" it to contain no toasted fields.
1094  *      This must be invoked on any potentially-composite field that is to be
1095  *      inserted into a tuple.  Doing this preserves the invariant that toasting
1096  *      goes only one level deep in a tuple.
1097  *
1098  *      Note that flattening does not mean expansion of short-header varlenas,
1099  *      so in one sense toasting is allowed within composite datums.
1100  * ----------
1101  */
1102 Datum
1103 toast_flatten_tuple_attribute(Datum value,
1104                                                           Oid typeId, int32 typeMod)
1105 {
1106         TupleDesc       tupleDesc;
1107         HeapTupleHeader olddata;
1108         HeapTupleHeader new_data;
1109         int32           new_header_len;
1110         int32           new_data_len;
1111         int32           new_tuple_len;
1112         HeapTupleData tmptup;
1113         Form_pg_attribute *att;
1114         int                     numAttrs;
1115         int                     i;
1116         bool            need_change = false;
1117         bool            has_nulls = false;
1118         Datum           toast_values[MaxTupleAttributeNumber];
1119         bool            toast_isnull[MaxTupleAttributeNumber];
1120         bool            toast_free[MaxTupleAttributeNumber];
1121
1122         /*
1123          * See if it's a composite type, and get the tupdesc if so.
1124          */
1125         tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
1126         if (tupleDesc == NULL)
1127                 return value;                   /* not a composite type */
1128
1129         att = tupleDesc->attrs;
1130         numAttrs = tupleDesc->natts;
1131
1132         /*
1133          * Break down the tuple into fields.
1134          */
1135         olddata = DatumGetHeapTupleHeader(value);
1136         Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
1137         Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
1138         /* Build a temporary HeapTuple control structure */
1139         tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
1140         ItemPointerSetInvalid(&(tmptup.t_self));
1141         tmptup.t_tableOid = InvalidOid;
1142         tmptup.t_data = olddata;
1143
1144         Assert(numAttrs <= MaxTupleAttributeNumber);
1145         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1146
1147         memset(toast_free, 0, numAttrs * sizeof(bool));
1148
1149         for (i = 0; i < numAttrs; i++)
1150         {
1151                 /*
1152                  * Look at non-null varlena attributes
1153                  */
1154                 if (toast_isnull[i])
1155                         has_nulls = true;
1156                 else if (att[i]->attlen == -1)
1157                 {
1158                         struct varlena *new_value;
1159
1160                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1161                         if (VARATT_IS_EXTERNAL(new_value) ||
1162                                 VARATT_IS_COMPRESSED(new_value))
1163                         {
1164                                 new_value = heap_tuple_untoast_attr(new_value);
1165                                 toast_values[i] = PointerGetDatum(new_value);
1166                                 toast_free[i] = true;
1167                                 need_change = true;
1168                         }
1169                 }
1170         }
1171
1172         /*
1173          * If nothing to untoast, just return the original tuple.
1174          */
1175         if (!need_change)
1176         {
1177                 ReleaseTupleDesc(tupleDesc);
1178                 return value;
1179         }
1180
1181         /*
1182          * Calculate the new size of the tuple.
1183          *
1184          * This should match the reconstruction code in toast_insert_or_update.
1185          */
1186         new_header_len = offsetof(HeapTupleHeaderData, t_bits);
1187         if (has_nulls)
1188                 new_header_len += BITMAPLEN(numAttrs);
1189         if (olddata->t_infomask & HEAP_HASOID)
1190                 new_header_len += sizeof(Oid);
1191         new_header_len = MAXALIGN(new_header_len);
1192         new_data_len = heap_compute_data_size(tupleDesc,
1193                                                                                   toast_values, toast_isnull);
1194         new_tuple_len = new_header_len + new_data_len;
1195
1196         new_data = (HeapTupleHeader) palloc0(new_tuple_len);
1197
1198         /*
1199          * Copy the existing tuple header, but adjust natts and t_hoff.
1200          */
1201         memcpy(new_data, olddata, offsetof(HeapTupleHeaderData, t_bits));
1202         HeapTupleHeaderSetNatts(new_data, numAttrs);
1203         new_data->t_hoff = new_header_len;
1204         if (olddata->t_infomask & HEAP_HASOID)
1205                 HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
1206
1207         /* Reset the datum length field, too */
1208         HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
1209
1210         /* Copy over the data, and fill the null bitmap if needed */
1211         heap_fill_tuple(tupleDesc,
1212                                         toast_values,
1213                                         toast_isnull,
1214                                         (char *) new_data + new_header_len,
1215                                         new_data_len,
1216                                         &(new_data->t_infomask),
1217                                         has_nulls ? new_data->t_bits : NULL);
1218
1219         /*
1220          * Free allocated temp values
1221          */
1222         for (i = 0; i < numAttrs; i++)
1223                 if (toast_free[i])
1224                         pfree(DatumGetPointer(toast_values[i]));
1225         ReleaseTupleDesc(tupleDesc);
1226
1227         return PointerGetDatum(new_data);
1228 }
1229
1230
1231 /* ----------
1232  * toast_compress_datum -
1233  *
1234  *      Create a compressed version of a varlena datum
1235  *
1236  *      If we fail (ie, compressed result is actually bigger than original)
1237  *      then return NULL.  We must not use compressed data if it'd expand
1238  *      the tuple!
1239  *
1240  *      We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1241  *      copying them.  But we can't handle external or compressed datums.
1242  * ----------
1243  */
1244 Datum
1245 toast_compress_datum(Datum value)
1246 {
1247         struct varlena *tmp;
1248         int32           valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1249
1250         Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1251         Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1252
1253         /*
1254          * No point in wasting a palloc cycle if value size is out of the allowed
1255          * range for compression
1256          */
1257         if (valsize < PGLZ_strategy_default->min_input_size ||
1258                 valsize > PGLZ_strategy_default->max_input_size)
1259                 return PointerGetDatum(NULL);
1260
1261         tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
1262
1263         /*
1264          * We recheck the actual size even if pglz_compress() reports success,
1265          * because it might be satisfied with having saved as little as one byte
1266          * in the compressed data --- which could turn into a net loss once you
1267          * consider header and alignment padding.  Worst case, the compressed
1268          * format might require three padding bytes (plus header, which is
1269          * included in VARSIZE(tmp)), whereas the uncompressed format would take
1270          * only one header byte and no padding if the value is short enough.  So
1271          * we insist on a savings of more than 2 bytes to ensure we have a gain.
1272          */
1273         if (pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
1274                                           (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
1275                 VARSIZE(tmp) < valsize - 2)
1276         {
1277                 /* successful compression */
1278                 return PointerGetDatum(tmp);
1279         }
1280         else
1281         {
1282                 /* incompressible data */
1283                 pfree(tmp);
1284                 return PointerGetDatum(NULL);
1285         }
1286 }
1287
1288
1289 /* ----------
1290  * toast_save_datum -
1291  *
1292  *      Save one single datum into the secondary relation and return
1293  *      a Datum reference for it.
1294  *
1295  * rel: the main relation we're working with (not the toast rel!)
1296  * value: datum to be pushed to toast storage
1297  * oldexternal: if not NULL, toast pointer previously representing the datum
1298  * options: options to be passed to heap_insert() for toast rows
1299  * ----------
1300  */
1301 static Datum
1302 toast_save_datum(Relation rel, Datum value,
1303                                  struct varlena * oldexternal, int options)
1304 {
1305         Relation        toastrel;
1306         Relation        toastidx;
1307         HeapTuple       toasttup;
1308         TupleDesc       toasttupDesc;
1309         Datum           t_values[3];
1310         bool            t_isnull[3];
1311         CommandId       mycid = GetCurrentCommandId(true);
1312         struct varlena *result;
1313         struct varatt_external toast_pointer;
1314         struct
1315         {
1316                 struct varlena hdr;
1317                 char            data[TOAST_MAX_CHUNK_SIZE]; /* make struct big enough */
1318                 int32           align_it;       /* ensure struct is aligned well enough */
1319         }                       chunk_data;
1320         int32           chunk_size;
1321         int32           chunk_seq = 0;
1322         char       *data_p;
1323         int32           data_todo;
1324         Pointer         dval = DatumGetPointer(value);
1325
1326         Assert(!VARATT_IS_EXTERNAL(value));
1327
1328         /*
1329          * Open the toast relation and its index.  We can use the index to check
1330          * uniqueness of the OID we assign to the toasted item, even though it has
1331          * additional columns besides OID.
1332          */
1333         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1334         toasttupDesc = toastrel->rd_att;
1335         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1336
1337         /*
1338          * Get the data pointer and length, and compute va_rawsize and va_extsize.
1339          *
1340          * va_rawsize is the size of the equivalent fully uncompressed datum, so
1341          * we have to adjust for short headers.
1342          *
1343          * va_extsize is the actual size of the data payload in the toast records.
1344          */
1345         if (VARATT_IS_SHORT(dval))
1346         {
1347                 data_p = VARDATA_SHORT(dval);
1348                 data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1349                 toast_pointer.va_rawsize = data_todo + VARHDRSZ;                /* as if not short */
1350                 toast_pointer.va_extsize = data_todo;
1351         }
1352         else if (VARATT_IS_COMPRESSED(dval))
1353         {
1354                 data_p = VARDATA(dval);
1355                 data_todo = VARSIZE(dval) - VARHDRSZ;
1356                 /* rawsize in a compressed datum is just the size of the payload */
1357                 toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1358                 toast_pointer.va_extsize = data_todo;
1359                 /* Assert that the numbers look like it's compressed */
1360                 Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1361         }
1362         else
1363         {
1364                 data_p = VARDATA(dval);
1365                 data_todo = VARSIZE(dval) - VARHDRSZ;
1366                 toast_pointer.va_rawsize = VARSIZE(dval);
1367                 toast_pointer.va_extsize = data_todo;
1368         }
1369
1370         /*
1371          * Insert the correct table OID into the result TOAST pointer.
1372          *
1373          * Normally this is the actual OID of the target toast table, but during
1374          * table-rewriting operations such as CLUSTER, we have to insert the OID
1375          * of the table's real permanent toast table instead.  rd_toastoid is set
1376          * if we have to substitute such an OID.
1377          */
1378         if (OidIsValid(rel->rd_toastoid))
1379                 toast_pointer.va_toastrelid = rel->rd_toastoid;
1380         else
1381                 toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1382
1383         /*
1384          * Choose an OID to use as the value ID for this toast value.
1385          *
1386          * Normally we just choose an unused OID within the toast table.  But
1387          * during table-rewriting operations where we are preserving an existing
1388          * toast table OID, we want to preserve toast value OIDs too.  So, if
1389          * rd_toastoid is set and we had a prior external value from that same
1390          * toast table, re-use its value ID.  If we didn't have a prior external
1391          * value (which is a corner case, but possible if the table's attstorage
1392          * options have been changed), we have to pick a value ID that doesn't
1393          * conflict with either new or existing toast value OIDs.
1394          */
1395         if (!OidIsValid(rel->rd_toastoid))
1396         {
1397                 /* normal case: just choose an unused OID */
1398                 toast_pointer.va_valueid =
1399                         GetNewOidWithIndex(toastrel,
1400                                                            RelationGetRelid(toastidx),
1401                                                            (AttrNumber) 1);
1402         }
1403         else
1404         {
1405                 /* rewrite case: check to see if value was in old toast table */
1406                 toast_pointer.va_valueid = InvalidOid;
1407                 if (oldexternal != NULL)
1408                 {
1409                         struct varatt_external old_toast_pointer;
1410
1411                         Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
1412                         /* Must copy to access aligned fields */
1413                         VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1414                         if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1415                         {
1416                                 /* This value came from the old toast table; reuse its OID */
1417                                 toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1418
1419                                 /*
1420                                  * There is a corner case here: the table rewrite might have
1421                                  * to copy both live and recently-dead versions of a row, and
1422                                  * those versions could easily reference the same toast value.
1423                                  * When we copy the second or later version of such a row,
1424                                  * reusing the OID will mean we select an OID that's already
1425                                  * in the new toast table.      Check for that, and if so, just
1426                                  * fall through without writing the data again.
1427                                  *
1428                                  * While annoying and ugly-looking, this is a good thing
1429                                  * because it ensures that we wind up with only one copy of
1430                                  * the toast value when there is only one copy in the old
1431                                  * toast table.  Before we detected this case, we'd have made
1432                                  * multiple copies, wasting space; and what's worse, the
1433                                  * copies belonging to already-deleted heap tuples would not
1434                                  * be reclaimed by VACUUM.
1435                                  */
1436                                 if (toastrel_valueid_exists(toastrel,
1437                                                                                         toast_pointer.va_valueid))
1438                                 {
1439                                         /* Match, so short-circuit the data storage loop below */
1440                                         data_todo = 0;
1441                                 }
1442                         }
1443                 }
1444                 if (toast_pointer.va_valueid == InvalidOid)
1445                 {
1446                         /*
1447                          * new value; must choose an OID that doesn't conflict in either
1448                          * old or new toast table
1449                          */
1450                         do
1451                         {
1452                                 toast_pointer.va_valueid =
1453                                         GetNewOidWithIndex(toastrel,
1454                                                                            RelationGetRelid(toastidx),
1455                                                                            (AttrNumber) 1);
1456                         } while (toastid_valueid_exists(rel->rd_toastoid,
1457                                                                                         toast_pointer.va_valueid));
1458                 }
1459         }
1460
1461         /*
1462          * Initialize constant parts of the tuple data
1463          */
1464         t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1465         t_values[2] = PointerGetDatum(&chunk_data);
1466         t_isnull[0] = false;
1467         t_isnull[1] = false;
1468         t_isnull[2] = false;
1469
1470         /*
1471          * Split up the item into chunks
1472          */
1473         while (data_todo > 0)
1474         {
1475                 /*
1476                  * Calculate the size of this chunk
1477                  */
1478                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1479
1480                 /*
1481                  * Build a tuple and store it
1482                  */
1483                 t_values[1] = Int32GetDatum(chunk_seq++);
1484                 SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1485                 memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1486                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1487
1488                 heap_insert(toastrel, toasttup, mycid, options, NULL);
1489
1490                 /*
1491                  * Create the index entry.      We cheat a little here by not using
1492                  * FormIndexDatum: this relies on the knowledge that the index columns
1493                  * are the same as the initial columns of the table.
1494                  *
1495                  * Note also that there had better not be any user-created index on
1496                  * the TOAST table, since we don't bother to update anything else.
1497                  */
1498                 index_insert(toastidx, t_values, t_isnull,
1499                                          &(toasttup->t_self),
1500                                          toastrel,
1501                                          toastidx->rd_index->indisunique ?
1502                                          UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
1503
1504                 /*
1505                  * Free memory
1506                  */
1507                 heap_freetuple(toasttup);
1508
1509                 /*
1510                  * Move on to next chunk
1511                  */
1512                 data_todo -= chunk_size;
1513                 data_p += chunk_size;
1514         }
1515
1516         /*
1517          * Done - close toast relation
1518          */
1519         index_close(toastidx, RowExclusiveLock);
1520         heap_close(toastrel, RowExclusiveLock);
1521
1522         /*
1523          * Create the TOAST pointer value that we'll return
1524          */
1525         result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1526         SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
1527         memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1528
1529         return PointerGetDatum(result);
1530 }
1531
1532
1533 /* ----------
1534  * toast_delete_datum -
1535  *
1536  *      Delete a single external stored value.
1537  * ----------
1538  */
1539 static void
1540 toast_delete_datum(Relation rel, Datum value)
1541 {
1542         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1543         struct varatt_external toast_pointer;
1544         Relation        toastrel;
1545         Relation        toastidx;
1546         ScanKeyData toastkey;
1547         SysScanDesc toastscan;
1548         HeapTuple       toasttup;
1549
1550         if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1551                 return;
1552
1553         /* Must copy to access aligned fields */
1554         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1555
1556         /*
1557          * Open the toast relation and its index
1558          */
1559         toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1560         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1561
1562         /*
1563          * Setup a scan key to find chunks with matching va_valueid
1564          */
1565         ScanKeyInit(&toastkey,
1566                                 (AttrNumber) 1,
1567                                 BTEqualStrategyNumber, F_OIDEQ,
1568                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1569
1570         /*
1571          * Find all the chunks.  (We don't actually care whether we see them in
1572          * sequence or not, but since we've already locked the index we might as
1573          * well use systable_beginscan_ordered.)
1574          */
1575         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1576                                                                                    SnapshotToast, 1, &toastkey);
1577         while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1578         {
1579                 /*
1580                  * Have a chunk, delete it
1581                  */
1582                 simple_heap_delete(toastrel, &toasttup->t_self);
1583         }
1584
1585         /*
1586          * End scan and close relations
1587          */
1588         systable_endscan_ordered(toastscan);
1589         index_close(toastidx, RowExclusiveLock);
1590         heap_close(toastrel, RowExclusiveLock);
1591 }
1592
1593
1594 /* ----------
1595  * toastrel_valueid_exists -
1596  *
1597  *      Test whether a toast value with the given ID exists in the toast relation
1598  * ----------
1599  */
1600 static bool
1601 toastrel_valueid_exists(Relation toastrel, Oid valueid)
1602 {
1603         bool            result = false;
1604         ScanKeyData toastkey;
1605         SysScanDesc toastscan;
1606
1607         /*
1608          * Setup a scan key to find chunks with matching va_valueid
1609          */
1610         ScanKeyInit(&toastkey,
1611                                 (AttrNumber) 1,
1612                                 BTEqualStrategyNumber, F_OIDEQ,
1613                                 ObjectIdGetDatum(valueid));
1614
1615         /*
1616          * Is there any such chunk?
1617          */
1618         toastscan = systable_beginscan(toastrel, toastrel->rd_rel->reltoastidxid,
1619                                                                    true, SnapshotToast, 1, &toastkey);
1620
1621         if (systable_getnext(toastscan) != NULL)
1622                 result = true;
1623
1624         systable_endscan(toastscan);
1625
1626         return result;
1627 }
1628
1629 /* ----------
1630  * toastid_valueid_exists -
1631  *
1632  *      As above, but work from toast rel's OID not an open relation
1633  * ----------
1634  */
1635 static bool
1636 toastid_valueid_exists(Oid toastrelid, Oid valueid)
1637 {
1638         bool            result;
1639         Relation        toastrel;
1640
1641         toastrel = heap_open(toastrelid, AccessShareLock);
1642
1643         result = toastrel_valueid_exists(toastrel, valueid);
1644
1645         heap_close(toastrel, AccessShareLock);
1646
1647         return result;
1648 }
1649
1650
1651 /* ----------
1652  * toast_fetch_datum -
1653  *
1654  *      Reconstruct an in memory Datum from the chunks saved
1655  *      in the toast relation
1656  * ----------
1657  */
1658 static struct varlena *
1659 toast_fetch_datum(struct varlena * attr)
1660 {
1661         Relation        toastrel;
1662         Relation        toastidx;
1663         ScanKeyData toastkey;
1664         SysScanDesc toastscan;
1665         HeapTuple       ttup;
1666         TupleDesc       toasttupDesc;
1667         struct varlena *result;
1668         struct varatt_external toast_pointer;
1669         int32           ressize;
1670         int32           residx,
1671                                 nextidx;
1672         int32           numchunks;
1673         Pointer         chunk;
1674         bool            isnull;
1675         char       *chunkdata;
1676         int32           chunksize;
1677
1678         if (VARATT_IS_EXTERNAL_INDIRECT(attr))
1679                 elog(ERROR, "shouldn't be called for indirect tuples");
1680
1681         /* Must copy to access aligned fields */
1682         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1683
1684         ressize = toast_pointer.va_extsize;
1685         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1686
1687         result = (struct varlena *) palloc(ressize + VARHDRSZ);
1688
1689         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1690                 SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1691         else
1692                 SET_VARSIZE(result, ressize + VARHDRSZ);
1693
1694         /*
1695          * Open the toast relation and its index
1696          */
1697         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1698         toasttupDesc = toastrel->rd_att;
1699         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1700
1701         /*
1702          * Setup a scan key to fetch from the index by va_valueid
1703          */
1704         ScanKeyInit(&toastkey,
1705                                 (AttrNumber) 1,
1706                                 BTEqualStrategyNumber, F_OIDEQ,
1707                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1708
1709         /*
1710          * Read the chunks by index
1711          *
1712          * Note that because the index is actually on (valueid, chunkidx) we will
1713          * see the chunks in chunkidx order, even though we didn't explicitly ask
1714          * for it.
1715          */
1716         nextidx = 0;
1717
1718         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1719                                                                                    SnapshotToast, 1, &toastkey);
1720         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1721         {
1722                 /*
1723                  * Have a chunk, extract the sequence number and the data
1724                  */
1725                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1726                 Assert(!isnull);
1727                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1728                 Assert(!isnull);
1729                 if (!VARATT_IS_EXTENDED(chunk))
1730                 {
1731                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1732                         chunkdata = VARDATA(chunk);
1733                 }
1734                 else if (VARATT_IS_SHORT(chunk))
1735                 {
1736                         /* could happen due to heap_form_tuple doing its thing */
1737                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1738                         chunkdata = VARDATA_SHORT(chunk);
1739                 }
1740                 else
1741                 {
1742                         /* should never happen */
1743                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1744                                  toast_pointer.va_valueid,
1745                                  RelationGetRelationName(toastrel));
1746                         chunksize = 0;          /* keep compiler quiet */
1747                         chunkdata = NULL;
1748                 }
1749
1750                 /*
1751                  * Some checks on the data we've found
1752                  */
1753                 if (residx != nextidx)
1754                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1755                                  residx, nextidx,
1756                                  toast_pointer.va_valueid,
1757                                  RelationGetRelationName(toastrel));
1758                 if (residx < numchunks - 1)
1759                 {
1760                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1761                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1762                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1763                                          residx, numchunks,
1764                                          toast_pointer.va_valueid,
1765                                          RelationGetRelationName(toastrel));
1766                 }
1767                 else if (residx == numchunks - 1)
1768                 {
1769                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1770                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
1771                                          chunksize,
1772                                          (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1773                                          residx,
1774                                          toast_pointer.va_valueid,
1775                                          RelationGetRelationName(toastrel));
1776                 }
1777                 else
1778                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1779                                  residx,
1780                                  0, numchunks - 1,
1781                                  toast_pointer.va_valueid,
1782                                  RelationGetRelationName(toastrel));
1783
1784                 /*
1785                  * Copy the data into proper place in our result
1786                  */
1787                 memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1788                            chunkdata,
1789                            chunksize);
1790
1791                 nextidx++;
1792         }
1793
1794         /*
1795          * Final checks that we successfully fetched the datum
1796          */
1797         if (nextidx != numchunks)
1798                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
1799                          nextidx,
1800                          toast_pointer.va_valueid,
1801                          RelationGetRelationName(toastrel));
1802
1803         /*
1804          * End scan and close relations
1805          */
1806         systable_endscan_ordered(toastscan);
1807         index_close(toastidx, AccessShareLock);
1808         heap_close(toastrel, AccessShareLock);
1809
1810         return result;
1811 }
1812
1813 /* ----------
1814  * toast_fetch_datum_slice -
1815  *
1816  *      Reconstruct a segment of a Datum from the chunks saved
1817  *      in the toast relation
1818  * ----------
1819  */
1820 static struct varlena *
1821 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1822 {
1823         Relation        toastrel;
1824         Relation        toastidx;
1825         ScanKeyData toastkey[3];
1826         int                     nscankeys;
1827         SysScanDesc toastscan;
1828         HeapTuple       ttup;
1829         TupleDesc       toasttupDesc;
1830         struct varlena *result;
1831         struct varatt_external toast_pointer;
1832         int32           attrsize;
1833         int32           residx;
1834         int32           nextidx;
1835         int                     numchunks;
1836         int                     startchunk;
1837         int                     endchunk;
1838         int32           startoffset;
1839         int32           endoffset;
1840         int                     totalchunks;
1841         Pointer         chunk;
1842         bool            isnull;
1843         char       *chunkdata;
1844         int32           chunksize;
1845         int32           chcpystrt;
1846         int32           chcpyend;
1847
1848         Assert(VARATT_IS_EXTERNAL_ONDISK(attr));
1849
1850         /* Must copy to access aligned fields */
1851         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1852
1853         /*
1854          * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
1855          * we can't return a compressed datum which is meaningful to toast later
1856          */
1857         Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1858
1859         attrsize = toast_pointer.va_extsize;
1860         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1861
1862         if (sliceoffset >= attrsize)
1863         {
1864                 sliceoffset = 0;
1865                 length = 0;
1866         }
1867
1868         if (((sliceoffset + length) > attrsize) || length < 0)
1869                 length = attrsize - sliceoffset;
1870
1871         result = (struct varlena *) palloc(length + VARHDRSZ);
1872
1873         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1874                 SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
1875         else
1876                 SET_VARSIZE(result, length + VARHDRSZ);
1877
1878         if (length == 0)
1879                 return result;                  /* Can save a lot of work at this point! */
1880
1881         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1882         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1883         numchunks = (endchunk - startchunk) + 1;
1884
1885         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1886         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1887
1888         /*
1889          * Open the toast relation and its index
1890          */
1891         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1892         toasttupDesc = toastrel->rd_att;
1893         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1894
1895         /*
1896          * Setup a scan key to fetch from the index. This is either two keys or
1897          * three depending on the number of chunks.
1898          */
1899         ScanKeyInit(&toastkey[0],
1900                                 (AttrNumber) 1,
1901                                 BTEqualStrategyNumber, F_OIDEQ,
1902                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1903
1904         /*
1905          * Use equality condition for one chunk, a range condition otherwise:
1906          */
1907         if (numchunks == 1)
1908         {
1909                 ScanKeyInit(&toastkey[1],
1910                                         (AttrNumber) 2,
1911                                         BTEqualStrategyNumber, F_INT4EQ,
1912                                         Int32GetDatum(startchunk));
1913                 nscankeys = 2;
1914         }
1915         else
1916         {
1917                 ScanKeyInit(&toastkey[1],
1918                                         (AttrNumber) 2,
1919                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1920                                         Int32GetDatum(startchunk));
1921                 ScanKeyInit(&toastkey[2],
1922                                         (AttrNumber) 2,
1923                                         BTLessEqualStrategyNumber, F_INT4LE,
1924                                         Int32GetDatum(endchunk));
1925                 nscankeys = 3;
1926         }
1927
1928         /*
1929          * Read the chunks by index
1930          *
1931          * The index is on (valueid, chunkidx) so they will come in order
1932          */
1933         nextidx = startchunk;
1934         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1935                                                                                  SnapshotToast, nscankeys, toastkey);
1936         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1937         {
1938                 /*
1939                  * Have a chunk, extract the sequence number and the data
1940                  */
1941                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1942                 Assert(!isnull);
1943                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1944                 Assert(!isnull);
1945                 if (!VARATT_IS_EXTENDED(chunk))
1946                 {
1947                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1948                         chunkdata = VARDATA(chunk);
1949                 }
1950                 else if (VARATT_IS_SHORT(chunk))
1951                 {
1952                         /* could happen due to heap_form_tuple doing its thing */
1953                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1954                         chunkdata = VARDATA_SHORT(chunk);
1955                 }
1956                 else
1957                 {
1958                         /* should never happen */
1959                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1960                                  toast_pointer.va_valueid,
1961                                  RelationGetRelationName(toastrel));
1962                         chunksize = 0;          /* keep compiler quiet */
1963                         chunkdata = NULL;
1964                 }
1965
1966                 /*
1967                  * Some checks on the data we've found
1968                  */
1969                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1970                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1971                                  residx, nextidx,
1972                                  toast_pointer.va_valueid,
1973                                  RelationGetRelationName(toastrel));
1974                 if (residx < totalchunks - 1)
1975                 {
1976                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1977                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
1978                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1979                                          residx, totalchunks,
1980                                          toast_pointer.va_valueid,
1981                                          RelationGetRelationName(toastrel));
1982                 }
1983                 else if (residx == totalchunks - 1)
1984                 {
1985                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1986                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
1987                                          chunksize,
1988                                          (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
1989                                          residx,
1990                                          toast_pointer.va_valueid,
1991                                          RelationGetRelationName(toastrel));
1992                 }
1993                 else
1994                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1995                                  residx,
1996                                  0, totalchunks - 1,
1997                                  toast_pointer.va_valueid,
1998                                  RelationGetRelationName(toastrel));
1999
2000                 /*
2001                  * Copy the data into proper place in our result
2002                  */
2003                 chcpystrt = 0;
2004                 chcpyend = chunksize - 1;
2005                 if (residx == startchunk)
2006                         chcpystrt = startoffset;
2007                 if (residx == endchunk)
2008                         chcpyend = endoffset;
2009
2010                 memcpy(VARDATA(result) +
2011                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
2012                            chunkdata + chcpystrt,
2013                            (chcpyend - chcpystrt) + 1);
2014
2015                 nextidx++;
2016         }
2017
2018         /*
2019          * Final checks that we successfully fetched the datum
2020          */
2021         if (nextidx != (endchunk + 1))
2022                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
2023                          nextidx,
2024                          toast_pointer.va_valueid,
2025                          RelationGetRelationName(toastrel));
2026
2027         /*
2028          * End scan and close relations
2029          */
2030         systable_endscan_ordered(toastscan);
2031         index_close(toastidx, AccessShareLock);
2032         heap_close(toastrel, AccessShareLock);
2033
2034         return result;
2035 }