]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Avoid incrementing the CommandCounter when CommandCounterIncrement is called
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2007, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.80 2007/11/30 21:22:53 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "utils/fmgroids.h"
39 #include "utils/pg_lzcompress.h"
40 #include "utils/typcache.h"
41
42
43 #undef TOAST_DEBUG
44
45 /* Size of an EXTERNAL datum that contains a standard TOAST pointer */
46 #define TOAST_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_external))
47
48 /*
49  * Testing whether an externally-stored value is compressed now requires
50  * comparing extsize (the actual length of the external data) to rawsize
51  * (the original uncompressed datum's size).  The latter includes VARHDRSZ
52  * overhead, the former doesn't.  We never use compression unless it actually
53  * saves space, so we expect either equality or less-than.
54  */
55 #define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \
56         ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ)
57
58 /*
59  * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum
60  * into a local "struct varatt_external" toast pointer.  This should be
61  * just a memcpy, but some versions of gcc seem to produce broken code
62  * that assumes the datum contents are aligned.  Introducing an explicit
63  * intermediate "varattrib_1b_e *" variable seems to fix it.
64  */
65 #define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \
66 do { \
67         varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \
68         Assert(VARSIZE_ANY_EXHDR(attre) == sizeof(toast_pointer)); \
69         memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
70 } while (0)
71
72
73 static void toast_delete_datum(Relation rel, Datum value);
74 static Datum toast_save_datum(Relation rel, Datum value,
75                                  bool use_wal, bool use_fsm);
76 static struct varlena *toast_fetch_datum(struct varlena * attr);
77 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
78                                                 int32 sliceoffset, int32 length);
79
80
81 /* ----------
82  * heap_tuple_fetch_attr -
83  *
84  *      Public entry point to get back a toasted value from
85  *      external storage (possibly still in compressed format).
86  *
87  * This will return a datum that contains all the data internally, ie, not
88  * relying on external storage, but it can still be compressed or have a short
89  * header.
90  ----------
91  */
92 struct varlena *
93 heap_tuple_fetch_attr(struct varlena * attr)
94 {
95         struct varlena *result;
96
97         if (VARATT_IS_EXTERNAL(attr))
98         {
99                 /*
100                  * This is an external stored plain value
101                  */
102                 result = toast_fetch_datum(attr);
103         }
104         else
105         {
106                 /*
107                  * This is a plain value inside of the main tuple - why am I called?
108                  */
109                 result = attr;
110         }
111
112         return result;
113 }
114
115
116 /* ----------
117  * heap_tuple_untoast_attr -
118  *
119  *      Public entry point to get back a toasted value from compression
120  *      or external storage.
121  * ----------
122  */
123 struct varlena *
124 heap_tuple_untoast_attr(struct varlena * attr)
125 {
126         if (VARATT_IS_EXTERNAL(attr))
127         {
128                 /*
129                  * This is an externally stored datum --- fetch it back from there
130                  */
131                 attr = toast_fetch_datum(attr);
132                 /* If it's compressed, decompress it */
133                 if (VARATT_IS_COMPRESSED(attr))
134                 {
135                         PGLZ_Header *tmp = (PGLZ_Header *) attr;
136
137                         attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
138                         SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
139                         pglz_decompress(tmp, VARDATA(attr));
140                         pfree(tmp);
141                 }
142         }
143         else if (VARATT_IS_COMPRESSED(attr))
144         {
145                 /*
146                  * This is a compressed value inside of the main tuple
147                  */
148                 PGLZ_Header *tmp = (PGLZ_Header *) attr;
149
150                 attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
151                 SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
152                 pglz_decompress(tmp, VARDATA(attr));
153         }
154         else if (VARATT_IS_SHORT(attr))
155         {
156                 /*
157                  * This is a short-header varlena --- convert to 4-byte header format
158                  */
159                 Size            data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
160                 Size            new_size = data_size + VARHDRSZ;
161                 struct varlena *new_attr;
162
163                 new_attr = (struct varlena *) palloc(new_size);
164                 SET_VARSIZE(new_attr, new_size);
165                 memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
166                 attr = new_attr;
167         }
168
169         return attr;
170 }
171
172
173 /* ----------
174  * heap_tuple_untoast_attr_slice -
175  *
176  *              Public entry point to get back part of a toasted value
177  *              from compression or external storage.
178  * ----------
179  */
180 struct varlena *
181 heap_tuple_untoast_attr_slice(struct varlena * attr,
182                                                           int32 sliceoffset, int32 slicelength)
183 {
184         struct varlena *preslice;
185         struct varlena *result;
186         char       *attrdata;
187         int32           attrsize;
188
189         if (VARATT_IS_EXTERNAL(attr))
190         {
191                 struct varatt_external toast_pointer;
192
193                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
194
195                 /* fast path for non-compressed external datums */
196                 if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
197                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
198
199                 /* fetch it back (compressed marker will get set automatically) */
200                 preslice = toast_fetch_datum(attr);
201         }
202         else
203                 preslice = attr;
204
205         if (VARATT_IS_COMPRESSED(preslice))
206         {
207                 PGLZ_Header *tmp = (PGLZ_Header *) preslice;
208                 Size            size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
209
210                 preslice = (struct varlena *) palloc(size);
211                 SET_VARSIZE(preslice, size);
212                 pglz_decompress(tmp, VARDATA(preslice));
213
214                 if (tmp != (PGLZ_Header *) attr)
215                         pfree(tmp);
216         }
217
218         if (VARATT_IS_SHORT(preslice))
219         {
220                 attrdata = VARDATA_SHORT(preslice);
221                 attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
222         }
223         else
224         {
225                 attrdata = VARDATA(preslice);
226                 attrsize = VARSIZE(preslice) - VARHDRSZ;
227         }
228
229         /* slicing of datum for compressed cases and plain value */
230
231         if (sliceoffset >= attrsize)
232         {
233                 sliceoffset = 0;
234                 slicelength = 0;
235         }
236
237         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
238                 slicelength = attrsize - sliceoffset;
239
240         result = (struct varlena *) palloc(slicelength + VARHDRSZ);
241         SET_VARSIZE(result, slicelength + VARHDRSZ);
242
243         memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
244
245         if (preslice != attr)
246                 pfree(preslice);
247
248         return result;
249 }
250
251
252 /* ----------
253  * toast_raw_datum_size -
254  *
255  *      Return the raw (detoasted) size of a varlena datum
256  *      (including the VARHDRSZ header)
257  * ----------
258  */
259 Size
260 toast_raw_datum_size(Datum value)
261 {
262         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
263         Size            result;
264
265         if (VARATT_IS_EXTERNAL(attr))
266         {
267                 /* va_rawsize is the size of the original datum -- including header */
268                 struct varatt_external toast_pointer;
269
270                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
271                 result = toast_pointer.va_rawsize;
272         }
273         else if (VARATT_IS_COMPRESSED(attr))
274         {
275                 /* here, va_rawsize is just the payload size */
276                 result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
277         }
278         else if (VARATT_IS_SHORT(attr))
279         {
280                 /*
281                  * we have to normalize the header length to VARHDRSZ or else the
282                  * callers of this function will be confused.
283                  */
284                 result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
285         }
286         else
287         {
288                 /* plain untoasted datum */
289                 result = VARSIZE(attr);
290         }
291         return result;
292 }
293
294 /* ----------
295  * toast_datum_size
296  *
297  *      Return the physical storage size (possibly compressed) of a varlena datum
298  * ----------
299  */
300 Size
301 toast_datum_size(Datum value)
302 {
303         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
304         Size            result;
305
306         if (VARATT_IS_EXTERNAL(attr))
307         {
308                 /*
309                  * Attribute is stored externally - return the extsize whether
310                  * compressed or not.  We do not count the size of the toast pointer
311                  * ... should we?
312                  */
313                 struct varatt_external toast_pointer;
314
315                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
316                 result = toast_pointer.va_extsize;
317         }
318         else if (VARATT_IS_SHORT(attr))
319         {
320                 result = VARSIZE_SHORT(attr);
321         }
322         else
323         {
324                 /*
325                  * Attribute is stored inline either compressed or not, just calculate
326                  * the size of the datum in either case.
327                  */
328                 result = VARSIZE(attr);
329         }
330         return result;
331 }
332
333
334 /* ----------
335  * toast_delete -
336  *
337  *      Cascaded delete toast-entries on DELETE
338  * ----------
339  */
340 void
341 toast_delete(Relation rel, HeapTuple oldtup)
342 {
343         TupleDesc       tupleDesc;
344         Form_pg_attribute *att;
345         int                     numAttrs;
346         int                     i;
347         Datum           toast_values[MaxHeapAttributeNumber];
348         bool            toast_isnull[MaxHeapAttributeNumber];
349
350         /*
351          * We should only ever be called for tuples of plain relations ---
352          * recursing on a toast rel is bad news.
353          */
354         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
355
356         /*
357          * Get the tuple descriptor and break down the tuple into fields.
358          *
359          * NOTE: it's debatable whether to use heap_deformtuple() here or just
360          * heap_getattr() only the varlena columns.  The latter could win if there
361          * are few varlena columns and many non-varlena ones. However,
362          * heap_deformtuple costs only O(N) while the heap_getattr way would cost
363          * O(N^2) if there are many varlena columns, so it seems better to err on
364          * the side of linear cost.  (We won't even be here unless there's at
365          * least one varlena column, by the way.)
366          */
367         tupleDesc = rel->rd_att;
368         att = tupleDesc->attrs;
369         numAttrs = tupleDesc->natts;
370
371         Assert(numAttrs <= MaxHeapAttributeNumber);
372         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
373
374         /*
375          * Check for external stored attributes and delete them from the secondary
376          * relation.
377          */
378         for (i = 0; i < numAttrs; i++)
379         {
380                 if (att[i]->attlen == -1)
381                 {
382                         Datum           value = toast_values[i];
383
384                         if (!toast_isnull[i] && VARATT_IS_EXTERNAL(value))
385                                 toast_delete_datum(rel, value);
386                 }
387         }
388 }
389
390
391 /* ----------
392  * toast_insert_or_update -
393  *
394  *      Delete no-longer-used toast-entries and create new ones to
395  *      make the new tuple fit on INSERT or UPDATE
396  *
397  * Inputs:
398  *      newtup: the candidate new tuple to be inserted
399  *      oldtup: the old row version for UPDATE, or NULL for INSERT
400  *      use_wal, use_fsm: flags to be passed to heap_insert() for toast rows
401  * Result:
402  *      either newtup if no toasting is needed, or a palloc'd modified tuple
403  *      that is what should actually get stored
404  *
405  * NOTE: neither newtup nor oldtup will be modified.  This is a change
406  * from the pre-8.1 API of this routine.
407  * ----------
408  */
409 HeapTuple
410 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
411                                            bool use_wal, bool use_fsm)
412 {
413         HeapTuple       result_tuple;
414         TupleDesc       tupleDesc;
415         Form_pg_attribute *att;
416         int                     numAttrs;
417         int                     i;
418
419         bool            need_change = false;
420         bool            need_free = false;
421         bool            need_delold = false;
422         bool            has_nulls = false;
423
424         Size            maxDataLen;
425         Size            hoff;
426
427         char            toast_action[MaxHeapAttributeNumber];
428         bool            toast_isnull[MaxHeapAttributeNumber];
429         bool            toast_oldisnull[MaxHeapAttributeNumber];
430         Datum           toast_values[MaxHeapAttributeNumber];
431         Datum           toast_oldvalues[MaxHeapAttributeNumber];
432         int32           toast_sizes[MaxHeapAttributeNumber];
433         bool            toast_free[MaxHeapAttributeNumber];
434         bool            toast_delold[MaxHeapAttributeNumber];
435
436         /*
437          * We should only ever be called for tuples of plain relations ---
438          * recursing on a toast rel is bad news.
439          */
440         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
441
442         /*
443          * Get the tuple descriptor and break down the tuple(s) into fields.
444          */
445         tupleDesc = rel->rd_att;
446         att = tupleDesc->attrs;
447         numAttrs = tupleDesc->natts;
448
449         Assert(numAttrs <= MaxHeapAttributeNumber);
450         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
451         if (oldtup != NULL)
452                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
453
454         /* ----------
455          * Then collect information about the values given
456          *
457          * NOTE: toast_action[i] can have these values:
458          *              ' '             default handling
459          *              'p'             already processed --- don't touch it
460          *              'x'             incompressible, but OK to move off
461          *
462          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
463          *              toast_action[i] different from 'p'.
464          * ----------
465          */
466         memset(toast_action, ' ', numAttrs * sizeof(char));
467         memset(toast_free, 0, numAttrs * sizeof(bool));
468         memset(toast_delold, 0, numAttrs * sizeof(bool));
469
470         for (i = 0; i < numAttrs; i++)
471         {
472                 struct varlena *old_value;
473                 struct varlena *new_value;
474
475                 if (oldtup != NULL)
476                 {
477                         /*
478                          * For UPDATE get the old and new values of this attribute
479                          */
480                         old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
481                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
482
483                         /*
484                          * If the old value is an external stored one, check if it has
485                          * changed so we have to delete it later.
486                          */
487                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
488                                 VARATT_IS_EXTERNAL(old_value))
489                         {
490                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
491                                         memcmp((char *) old_value, (char *) new_value,
492                                                    VARSIZE_EXTERNAL(old_value)) != 0)
493                                 {
494                                         /*
495                                          * The old external stored value isn't needed any more
496                                          * after the update
497                                          */
498                                         toast_delold[i] = true;
499                                         need_delold = true;
500                                 }
501                                 else
502                                 {
503                                         /*
504                                          * This attribute isn't changed by this update so we reuse
505                                          * the original reference to the old value in the new
506                                          * tuple.
507                                          */
508                                         toast_action[i] = 'p';
509                                         continue;
510                                 }
511                         }
512                 }
513                 else
514                 {
515                         /*
516                          * For INSERT simply get the new value
517                          */
518                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
519                 }
520
521                 /*
522                  * Handle NULL attributes
523                  */
524                 if (toast_isnull[i])
525                 {
526                         toast_action[i] = 'p';
527                         has_nulls = true;
528                         continue;
529                 }
530
531                 /*
532                  * Now look at varlena attributes
533                  */
534                 if (att[i]->attlen == -1)
535                 {
536                         /*
537                          * If the table's attribute says PLAIN always, force it so.
538                          */
539                         if (att[i]->attstorage == 'p')
540                                 toast_action[i] = 'p';
541
542                         /*
543                          * We took care of UPDATE above, so any external value we find
544                          * still in the tuple must be someone else's we cannot reuse.
545                          * Fetch it back (without decompression, unless we are forcing
546                          * PLAIN storage).      If necessary, we'll push it out as a new
547                          * external value below.
548                          */
549                         if (VARATT_IS_EXTERNAL(new_value))
550                         {
551                                 if (att[i]->attstorage == 'p')
552                                         new_value = heap_tuple_untoast_attr(new_value);
553                                 else
554                                         new_value = heap_tuple_fetch_attr(new_value);
555                                 toast_values[i] = PointerGetDatum(new_value);
556                                 toast_free[i] = true;
557                                 need_change = true;
558                                 need_free = true;
559                         }
560
561                         /*
562                          * Remember the size of this attribute
563                          */
564                         toast_sizes[i] = VARSIZE_ANY(new_value);
565                 }
566                 else
567                 {
568                         /*
569                          * Not a varlena attribute, plain storage always
570                          */
571                         toast_action[i] = 'p';
572                 }
573         }
574
575         /* ----------
576          * Compress and/or save external until data fits into target length
577          *
578          *      1: Inline compress attributes with attstorage 'x'
579          *      2: Store attributes with attstorage 'x' or 'e' external
580          *      3: Inline compress attributes with attstorage 'm'
581          *      4: Store attributes with attstorage 'm' external
582          * ----------
583          */
584
585         /* compute header overhead --- this should match heap_form_tuple() */
586         hoff = offsetof(HeapTupleHeaderData, t_bits);
587         if (has_nulls)
588                 hoff += BITMAPLEN(numAttrs);
589         if (newtup->t_data->t_infomask & HEAP_HASOID)
590                 hoff += sizeof(Oid);
591         hoff = MAXALIGN(hoff);
592         Assert(hoff == newtup->t_data->t_hoff);
593         /* now convert to a limit on the tuple data size */
594         maxDataLen = TOAST_TUPLE_TARGET - hoff;
595
596         /*
597          * Look for attributes with attstorage 'x' to compress
598          */
599         while (heap_compute_data_size(tupleDesc,
600                                                                   toast_values, toast_isnull) > maxDataLen)
601         {
602                 int                     biggest_attno = -1;
603                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
604                 Datum           old_value;
605                 Datum           new_value;
606
607                 /*
608                  * Search for the biggest yet uncompressed internal attribute
609                  */
610                 for (i = 0; i < numAttrs; i++)
611                 {
612                         if (toast_action[i] != ' ')
613                                 continue;
614                         if (VARATT_IS_EXTERNAL(toast_values[i]))
615                                 continue;               /* can't happen, toast_action would be 'p' */
616                         if (VARATT_IS_COMPRESSED(toast_values[i]))
617                                 continue;
618                         if (att[i]->attstorage != 'x')
619                                 continue;
620                         if (toast_sizes[i] > biggest_size)
621                         {
622                                 biggest_attno = i;
623                                 biggest_size = toast_sizes[i];
624                         }
625                 }
626
627                 if (biggest_attno < 0)
628                         break;
629
630                 /*
631                  * Attempt to compress it inline
632                  */
633                 i = biggest_attno;
634                 old_value = toast_values[i];
635                 new_value = toast_compress_datum(old_value);
636
637                 if (DatumGetPointer(new_value) != NULL)
638                 {
639                         /* successful compression */
640                         if (toast_free[i])
641                                 pfree(DatumGetPointer(old_value));
642                         toast_values[i] = new_value;
643                         toast_free[i] = true;
644                         toast_sizes[i] = VARSIZE(toast_values[i]);
645                         need_change = true;
646                         need_free = true;
647                 }
648                 else
649                 {
650                         /*
651                          * incompressible data, ignore on subsequent compression passes
652                          */
653                         toast_action[i] = 'x';
654                 }
655         }
656
657         /*
658          * Second we look for attributes of attstorage 'x' or 'e' that are still
659          * inline.      But skip this if there's no toast table to push them to.
660          */
661         while (heap_compute_data_size(tupleDesc,
662                                                                   toast_values, toast_isnull) > maxDataLen &&
663                    rel->rd_rel->reltoastrelid != InvalidOid)
664         {
665                 int                     biggest_attno = -1;
666                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
667                 Datum           old_value;
668
669                 /*------
670                  * Search for the biggest yet inlined attribute with
671                  * attstorage equals 'x' or 'e'
672                  *------
673                  */
674                 for (i = 0; i < numAttrs; i++)
675                 {
676                         if (toast_action[i] == 'p')
677                                 continue;
678                         if (VARATT_IS_EXTERNAL(toast_values[i]))
679                                 continue;               /* can't happen, toast_action would be 'p' */
680                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
681                                 continue;
682                         if (toast_sizes[i] > biggest_size)
683                         {
684                                 biggest_attno = i;
685                                 biggest_size = toast_sizes[i];
686                         }
687                 }
688
689                 if (biggest_attno < 0)
690                         break;
691
692                 /*
693                  * Store this external
694                  */
695                 i = biggest_attno;
696                 old_value = toast_values[i];
697                 toast_action[i] = 'p';
698                 toast_values[i] = toast_save_datum(rel, toast_values[i],
699                                                                                    use_wal, use_fsm);
700                 if (toast_free[i])
701                         pfree(DatumGetPointer(old_value));
702                 toast_free[i] = true;
703
704                 need_change = true;
705                 need_free = true;
706         }
707
708         /*
709          * Round 3 - this time we take attributes with storage 'm' into
710          * compression
711          */
712         while (heap_compute_data_size(tupleDesc,
713                                                                   toast_values, toast_isnull) > maxDataLen)
714         {
715                 int                     biggest_attno = -1;
716                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
717                 Datum           old_value;
718                 Datum           new_value;
719
720                 /*
721                  * Search for the biggest yet uncompressed internal attribute
722                  */
723                 for (i = 0; i < numAttrs; i++)
724                 {
725                         if (toast_action[i] != ' ')
726                                 continue;
727                         if (VARATT_IS_EXTERNAL(toast_values[i]))
728                                 continue;               /* can't happen, toast_action would be 'p' */
729                         if (VARATT_IS_COMPRESSED(toast_values[i]))
730                                 continue;
731                         if (att[i]->attstorage != 'm')
732                                 continue;
733                         if (toast_sizes[i] > biggest_size)
734                         {
735                                 biggest_attno = i;
736                                 biggest_size = toast_sizes[i];
737                         }
738                 }
739
740                 if (biggest_attno < 0)
741                         break;
742
743                 /*
744                  * Attempt to compress it inline
745                  */
746                 i = biggest_attno;
747                 old_value = toast_values[i];
748                 new_value = toast_compress_datum(old_value);
749
750                 if (DatumGetPointer(new_value) != NULL)
751                 {
752                         /* successful compression */
753                         if (toast_free[i])
754                                 pfree(DatumGetPointer(old_value));
755                         toast_values[i] = new_value;
756                         toast_free[i] = true;
757                         toast_sizes[i] = VARSIZE(toast_values[i]);
758                         need_change = true;
759                         need_free = true;
760                 }
761                 else
762                 {
763                         /*
764                          * incompressible data, ignore on subsequent compression passes
765                          */
766                         toast_action[i] = 'x';
767                 }
768         }
769
770         /*
771          * Finally we store attributes of type 'm' external, if possible.
772          */
773         while (heap_compute_data_size(tupleDesc,
774                                                                   toast_values, toast_isnull) > maxDataLen &&
775                    rel->rd_rel->reltoastrelid != InvalidOid)
776         {
777                 int                     biggest_attno = -1;
778                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
779                 Datum           old_value;
780
781                 /*--------
782                  * Search for the biggest yet inlined attribute with
783                  * attstorage = 'm'
784                  *--------
785                  */
786                 for (i = 0; i < numAttrs; i++)
787                 {
788                         if (toast_action[i] == 'p')
789                                 continue;
790                         if (VARATT_IS_EXTERNAL(toast_values[i]))
791                                 continue;               /* can't happen, toast_action would be 'p' */
792                         if (att[i]->attstorage != 'm')
793                                 continue;
794                         if (toast_sizes[i] > biggest_size)
795                         {
796                                 biggest_attno = i;
797                                 biggest_size = toast_sizes[i];
798                         }
799                 }
800
801                 if (biggest_attno < 0)
802                         break;
803
804                 /*
805                  * Store this external
806                  */
807                 i = biggest_attno;
808                 old_value = toast_values[i];
809                 toast_action[i] = 'p';
810                 toast_values[i] = toast_save_datum(rel, toast_values[i],
811                                                                                    use_wal, use_fsm);
812                 if (toast_free[i])
813                         pfree(DatumGetPointer(old_value));
814                 toast_free[i] = true;
815
816                 need_change = true;
817                 need_free = true;
818         }
819
820         /*
821          * In the case we toasted any values, we need to build a new heap tuple
822          * with the changed values.
823          */
824         if (need_change)
825         {
826                 HeapTupleHeader olddata = newtup->t_data;
827                 HeapTupleHeader new_data;
828                 int32           new_len;
829                 int32           new_data_len;
830
831                 /*
832                  * Calculate the new size of the tuple.  Header size should not
833                  * change, but data size might.
834                  */
835                 new_len = offsetof(HeapTupleHeaderData, t_bits);
836                 if (has_nulls)
837                         new_len += BITMAPLEN(numAttrs);
838                 if (olddata->t_infomask & HEAP_HASOID)
839                         new_len += sizeof(Oid);
840                 new_len = MAXALIGN(new_len);
841                 Assert(new_len == olddata->t_hoff);
842                 new_data_len = heap_compute_data_size(tupleDesc,
843                                                                                           toast_values, toast_isnull);
844                 new_len += new_data_len;
845
846                 /*
847                  * Allocate and zero the space needed, and fill HeapTupleData fields.
848                  */
849                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
850                 result_tuple->t_len = new_len;
851                 result_tuple->t_self = newtup->t_self;
852                 result_tuple->t_tableOid = newtup->t_tableOid;
853                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
854                 result_tuple->t_data = new_data;
855
856                 /*
857                  * Put the existing tuple header and the changed values into place
858                  */
859                 memcpy(new_data, olddata, olddata->t_hoff);
860
861                 heap_fill_tuple(tupleDesc,
862                                                 toast_values,
863                                                 toast_isnull,
864                                                 (char *) new_data + olddata->t_hoff,
865                                                 new_data_len,
866                                                 &(new_data->t_infomask),
867                                                 has_nulls ? new_data->t_bits : NULL);
868         }
869         else
870                 result_tuple = newtup;
871
872         /*
873          * Free allocated temp values
874          */
875         if (need_free)
876                 for (i = 0; i < numAttrs; i++)
877                         if (toast_free[i])
878                                 pfree(DatumGetPointer(toast_values[i]));
879
880         /*
881          * Delete external values from the old tuple
882          */
883         if (need_delold)
884                 for (i = 0; i < numAttrs; i++)
885                         if (toast_delold[i])
886                                 toast_delete_datum(rel, toast_oldvalues[i]);
887
888         return result_tuple;
889 }
890
891
892 /* ----------
893  * toast_flatten_tuple_attribute -
894  *
895  *      If a Datum is of composite type, "flatten" it to contain no toasted fields.
896  *      This must be invoked on any potentially-composite field that is to be
897  *      inserted into a tuple.  Doing this preserves the invariant that toasting
898  *      goes only one level deep in a tuple.
899  *
900  *      Note that flattening does not mean expansion of short-header varlenas,
901  *      so in one sense toasting is allowed within composite datums.
902  * ----------
903  */
904 Datum
905 toast_flatten_tuple_attribute(Datum value,
906                                                           Oid typeId, int32 typeMod)
907 {
908         TupleDesc       tupleDesc;
909         HeapTupleHeader olddata;
910         HeapTupleHeader new_data;
911         int32           new_len;
912         int32           new_data_len;
913         HeapTupleData tmptup;
914         Form_pg_attribute *att;
915         int                     numAttrs;
916         int                     i;
917         bool            need_change = false;
918         bool            has_nulls = false;
919         Datum           toast_values[MaxTupleAttributeNumber];
920         bool            toast_isnull[MaxTupleAttributeNumber];
921         bool            toast_free[MaxTupleAttributeNumber];
922
923         /*
924          * See if it's a composite type, and get the tupdesc if so.
925          */
926         tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
927         if (tupleDesc == NULL)
928                 return value;                   /* not a composite type */
929
930         att = tupleDesc->attrs;
931         numAttrs = tupleDesc->natts;
932
933         /*
934          * Break down the tuple into fields.
935          */
936         olddata = DatumGetHeapTupleHeader(value);
937         Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
938         Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
939         /* Build a temporary HeapTuple control structure */
940         tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
941         ItemPointerSetInvalid(&(tmptup.t_self));
942         tmptup.t_tableOid = InvalidOid;
943         tmptup.t_data = olddata;
944
945         Assert(numAttrs <= MaxTupleAttributeNumber);
946         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
947
948         memset(toast_free, 0, numAttrs * sizeof(bool));
949
950         for (i = 0; i < numAttrs; i++)
951         {
952                 /*
953                  * Look at non-null varlena attributes
954                  */
955                 if (toast_isnull[i])
956                         has_nulls = true;
957                 else if (att[i]->attlen == -1)
958                 {
959                         struct varlena *new_value;
960
961                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
962                         if (VARATT_IS_EXTERNAL(new_value) ||
963                                 VARATT_IS_COMPRESSED(new_value))
964                         {
965                                 new_value = heap_tuple_untoast_attr(new_value);
966                                 toast_values[i] = PointerGetDatum(new_value);
967                                 toast_free[i] = true;
968                                 need_change = true;
969                         }
970                 }
971         }
972
973         /*
974          * If nothing to untoast, just return the original tuple.
975          */
976         if (!need_change)
977         {
978                 ReleaseTupleDesc(tupleDesc);
979                 return value;
980         }
981
982         /*
983          * Calculate the new size of the tuple.  Header size should not change,
984          * but data size might.
985          */
986         new_len = offsetof(HeapTupleHeaderData, t_bits);
987         if (has_nulls)
988                 new_len += BITMAPLEN(numAttrs);
989         if (olddata->t_infomask & HEAP_HASOID)
990                 new_len += sizeof(Oid);
991         new_len = MAXALIGN(new_len);
992         Assert(new_len == olddata->t_hoff);
993         new_data_len = heap_compute_data_size(tupleDesc,
994                                                                                   toast_values, toast_isnull);
995         new_len += new_data_len;
996
997         new_data = (HeapTupleHeader) palloc0(new_len);
998
999         /*
1000          * Put the tuple header and the changed values into place
1001          */
1002         memcpy(new_data, olddata, olddata->t_hoff);
1003
1004         HeapTupleHeaderSetDatumLength(new_data, new_len);
1005
1006         heap_fill_tuple(tupleDesc,
1007                                         toast_values,
1008                                         toast_isnull,
1009                                         (char *) new_data + olddata->t_hoff,
1010                                         new_data_len,
1011                                         &(new_data->t_infomask),
1012                                         has_nulls ? new_data->t_bits : NULL);
1013
1014         /*
1015          * Free allocated temp values
1016          */
1017         for (i = 0; i < numAttrs; i++)
1018                 if (toast_free[i])
1019                         pfree(DatumGetPointer(toast_values[i]));
1020         ReleaseTupleDesc(tupleDesc);
1021
1022         return PointerGetDatum(new_data);
1023 }
1024
1025
1026 /* ----------
1027  * toast_compress_datum -
1028  *
1029  *      Create a compressed version of a varlena datum
1030  *
1031  *      If we fail (ie, compressed result is actually bigger than original)
1032  *      then return NULL.  We must not use compressed data if it'd expand
1033  *      the tuple!
1034  *
1035  *      We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1036  *      copying them.  But we can't handle external or compressed datums.
1037  * ----------
1038  */
1039 Datum
1040 toast_compress_datum(Datum value)
1041 {
1042         struct varlena *tmp;
1043         int32           valsize = VARSIZE_ANY_EXHDR(value);
1044
1045         Assert(!VARATT_IS_EXTERNAL(value));
1046         Assert(!VARATT_IS_COMPRESSED(value));
1047
1048         /*
1049          * No point in wasting a palloc cycle if value is too short for
1050          * compression
1051          */
1052         if (valsize < PGLZ_strategy_default->min_input_size)
1053                 return PointerGetDatum(NULL);
1054
1055         tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
1056         if (pglz_compress(VARDATA_ANY(value), valsize,
1057                                           (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
1058                 VARSIZE(tmp) < VARSIZE_ANY(value))
1059         {
1060                 /* successful compression */
1061                 return PointerGetDatum(tmp);
1062         }
1063         else
1064         {
1065                 /* incompressible data */
1066                 pfree(tmp);
1067                 return PointerGetDatum(NULL);
1068         }
1069 }
1070
1071
1072 /* ----------
1073  * toast_save_datum -
1074  *
1075  *      Save one single datum into the secondary relation and return
1076  *      a Datum reference for it.
1077  * ----------
1078  */
1079 static Datum
1080 toast_save_datum(Relation rel, Datum value,
1081                                  bool use_wal, bool use_fsm)
1082 {
1083         Relation        toastrel;
1084         Relation        toastidx;
1085         HeapTuple       toasttup;
1086         TupleDesc       toasttupDesc;
1087         Datum           t_values[3];
1088         bool            t_isnull[3];
1089         CommandId       mycid = GetCurrentCommandId(true);
1090         struct varlena *result;
1091         struct varatt_external toast_pointer;
1092         struct
1093         {
1094                 struct varlena hdr;
1095                 char            data[TOAST_MAX_CHUNK_SIZE];
1096         }                       chunk_data;
1097         int32           chunk_size;
1098         int32           chunk_seq = 0;
1099         char       *data_p;
1100         int32           data_todo;
1101
1102         /*
1103          * Open the toast relation and its index.  We can use the index to check
1104          * uniqueness of the OID we assign to the toasted item, even though it has
1105          * additional columns besides OID.
1106          */
1107         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1108         toasttupDesc = toastrel->rd_att;
1109         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1110
1111         /*
1112          * Get the data pointer and length, and compute va_rawsize and va_extsize.
1113          *
1114          * va_rawsize is the size of the equivalent fully uncompressed datum, so
1115          * we have to adjust for short headers.
1116          *
1117          * va_extsize is the actual size of the data payload in the toast records.
1118          */
1119         if (VARATT_IS_SHORT(value))
1120         {
1121                 data_p = VARDATA_SHORT(value);
1122                 data_todo = VARSIZE_SHORT(value) - VARHDRSZ_SHORT;
1123                 toast_pointer.va_rawsize = data_todo + VARHDRSZ;                /* as if not short */
1124                 toast_pointer.va_extsize = data_todo;
1125         }
1126         else if (VARATT_IS_COMPRESSED(value))
1127         {
1128                 data_p = VARDATA(value);
1129                 data_todo = VARSIZE(value) - VARHDRSZ;
1130                 /* rawsize in a compressed datum is just the size of the payload */
1131                 toast_pointer.va_rawsize = VARRAWSIZE_4B_C(value) + VARHDRSZ;
1132                 toast_pointer.va_extsize = data_todo;
1133                 /* Assert that the numbers look like it's compressed */
1134                 Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1135         }
1136         else
1137         {
1138                 data_p = VARDATA(value);
1139                 data_todo = VARSIZE(value) - VARHDRSZ;
1140                 toast_pointer.va_rawsize = VARSIZE(value);
1141                 toast_pointer.va_extsize = data_todo;
1142         }
1143
1144         toast_pointer.va_valueid = GetNewOidWithIndex(toastrel, toastidx);
1145         toast_pointer.va_toastrelid = rel->rd_rel->reltoastrelid;
1146
1147         /*
1148          * Initialize constant parts of the tuple data
1149          */
1150         t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1151         t_values[2] = PointerGetDatum(&chunk_data);
1152         t_isnull[0] = false;
1153         t_isnull[1] = false;
1154         t_isnull[2] = false;
1155
1156         /*
1157          * Split up the item into chunks
1158          */
1159         while (data_todo > 0)
1160         {
1161                 /*
1162                  * Calculate the size of this chunk
1163                  */
1164                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1165
1166                 /*
1167                  * Build a tuple and store it
1168                  */
1169                 t_values[1] = Int32GetDatum(chunk_seq++);
1170                 SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1171                 memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1172                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1173                 if (!HeapTupleIsValid(toasttup))
1174                         elog(ERROR, "failed to build TOAST tuple");
1175
1176                 heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm);
1177
1178                 /*
1179                  * Create the index entry.      We cheat a little here by not using
1180                  * FormIndexDatum: this relies on the knowledge that the index columns
1181                  * are the same as the initial columns of the table.
1182                  *
1183                  * Note also that there had better not be any user-created index on
1184                  * the TOAST table, since we don't bother to update anything else.
1185                  */
1186                 index_insert(toastidx, t_values, t_isnull,
1187                                          &(toasttup->t_self),
1188                                          toastrel, toastidx->rd_index->indisunique);
1189
1190                 /*
1191                  * Free memory
1192                  */
1193                 heap_freetuple(toasttup);
1194
1195                 /*
1196                  * Move on to next chunk
1197                  */
1198                 data_todo -= chunk_size;
1199                 data_p += chunk_size;
1200         }
1201
1202         /*
1203          * Done - close toast relation
1204          */
1205         index_close(toastidx, RowExclusiveLock);
1206         heap_close(toastrel, RowExclusiveLock);
1207
1208         /*
1209          * Create the TOAST pointer value that we'll return
1210          */
1211         result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1212         SET_VARSIZE_EXTERNAL(result, TOAST_POINTER_SIZE);
1213         memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1214
1215         return PointerGetDatum(result);
1216 }
1217
1218
1219 /* ----------
1220  * toast_delete_datum -
1221  *
1222  *      Delete a single external stored value.
1223  * ----------
1224  */
1225 static void
1226 toast_delete_datum(Relation rel, Datum value)
1227 {
1228         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1229         struct varatt_external toast_pointer;
1230         Relation        toastrel;
1231         Relation        toastidx;
1232         ScanKeyData toastkey;
1233         IndexScanDesc toastscan;
1234         HeapTuple       toasttup;
1235
1236         if (!VARATT_IS_EXTERNAL(attr))
1237                 return;
1238
1239         /* Must copy to access aligned fields */
1240         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1241
1242         /*
1243          * Open the toast relation and its index
1244          */
1245         toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1246         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1247
1248         /*
1249          * Setup a scan key to fetch from the index by va_valueid (we don't
1250          * particularly care whether we see them in sequence or not)
1251          */
1252         ScanKeyInit(&toastkey,
1253                                 (AttrNumber) 1,
1254                                 BTEqualStrategyNumber, F_OIDEQ,
1255                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1256
1257         /*
1258          * Find the chunks by index
1259          */
1260         toastscan = index_beginscan(toastrel, toastidx,
1261                                                                 SnapshotToast, 1, &toastkey);
1262         while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1263         {
1264                 /*
1265                  * Have a chunk, delete it
1266                  */
1267                 simple_heap_delete(toastrel, &toasttup->t_self);
1268         }
1269
1270         /*
1271          * End scan and close relations
1272          */
1273         index_endscan(toastscan);
1274         index_close(toastidx, RowExclusiveLock);
1275         heap_close(toastrel, RowExclusiveLock);
1276 }
1277
1278
1279 /* ----------
1280  * toast_fetch_datum -
1281  *
1282  *      Reconstruct an in memory Datum from the chunks saved
1283  *      in the toast relation
1284  * ----------
1285  */
1286 static struct varlena *
1287 toast_fetch_datum(struct varlena * attr)
1288 {
1289         Relation        toastrel;
1290         Relation        toastidx;
1291         ScanKeyData toastkey;
1292         IndexScanDesc toastscan;
1293         HeapTuple       ttup;
1294         TupleDesc       toasttupDesc;
1295         struct varlena *result;
1296         struct varatt_external toast_pointer;
1297         int32           ressize;
1298         int32           residx,
1299                                 nextidx;
1300         int32           numchunks;
1301         Pointer         chunk;
1302         bool            isnull;
1303         char       *chunkdata;
1304         int32           chunksize;
1305
1306         /* Must copy to access aligned fields */
1307         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1308
1309         ressize = toast_pointer.va_extsize;
1310         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1311
1312         result = (struct varlena *) palloc(ressize + VARHDRSZ);
1313
1314         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1315                 SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1316         else
1317                 SET_VARSIZE(result, ressize + VARHDRSZ);
1318
1319         /*
1320          * Open the toast relation and its index
1321          */
1322         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1323         toasttupDesc = toastrel->rd_att;
1324         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1325
1326         /*
1327          * Setup a scan key to fetch from the index by va_valueid
1328          */
1329         ScanKeyInit(&toastkey,
1330                                 (AttrNumber) 1,
1331                                 BTEqualStrategyNumber, F_OIDEQ,
1332                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1333
1334         /*
1335          * Read the chunks by index
1336          *
1337          * Note that because the index is actually on (valueid, chunkidx) we will
1338          * see the chunks in chunkidx order, even though we didn't explicitly ask
1339          * for it.
1340          */
1341         nextidx = 0;
1342
1343         toastscan = index_beginscan(toastrel, toastidx,
1344                                                                 SnapshotToast, 1, &toastkey);
1345         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1346         {
1347                 /*
1348                  * Have a chunk, extract the sequence number and the data
1349                  */
1350                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1351                 Assert(!isnull);
1352                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1353                 Assert(!isnull);
1354                 if (!VARATT_IS_EXTENDED(chunk))
1355                 {
1356                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1357                         chunkdata = VARDATA(chunk);
1358                 }
1359                 else if (VARATT_IS_SHORT(chunk))
1360                 {
1361                         /* could happen due to heap_form_tuple doing its thing */
1362                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1363                         chunkdata = VARDATA_SHORT(chunk);
1364                 }
1365                 else
1366                 {
1367                         /* should never happen */
1368                         elog(ERROR, "found toasted toast chunk");
1369                         chunksize = 0;          /* keep compiler quiet */
1370                         chunkdata = NULL;
1371                 }
1372
1373                 /*
1374                  * Some checks on the data we've found
1375                  */
1376                 if (residx != nextidx)
1377                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1378                                  residx, nextidx,
1379                                  toast_pointer.va_valueid);
1380                 if (residx < numchunks - 1)
1381                 {
1382                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1383                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u",
1384                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1385                                          residx, numchunks,
1386                                          toast_pointer.va_valueid);
1387                 }
1388                 else if (residx == numchunks - 1)
1389                 {
1390                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1391                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u",
1392                                          chunksize,
1393                                          (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1394                                          residx,
1395                                          toast_pointer.va_valueid);
1396                 }
1397                 else
1398                         elog(ERROR, "unexpected chunk number %d for toast value %u (out of range %d..%d)",
1399                                  residx,
1400                                  toast_pointer.va_valueid,
1401                                  0, numchunks - 1);
1402
1403                 /*
1404                  * Copy the data into proper place in our result
1405                  */
1406                 memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1407                            chunkdata,
1408                            chunksize);
1409
1410                 nextidx++;
1411         }
1412
1413         /*
1414          * Final checks that we successfully fetched the datum
1415          */
1416         if (nextidx != numchunks)
1417                 elog(ERROR, "missing chunk number %d for toast value %u",
1418                          nextidx,
1419                          toast_pointer.va_valueid);
1420
1421         /*
1422          * End scan and close relations
1423          */
1424         index_endscan(toastscan);
1425         index_close(toastidx, AccessShareLock);
1426         heap_close(toastrel, AccessShareLock);
1427
1428         return result;
1429 }
1430
1431 /* ----------
1432  * toast_fetch_datum_slice -
1433  *
1434  *      Reconstruct a segment of a Datum from the chunks saved
1435  *      in the toast relation
1436  * ----------
1437  */
1438 static struct varlena *
1439 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1440 {
1441         Relation        toastrel;
1442         Relation        toastidx;
1443         ScanKeyData toastkey[3];
1444         int                     nscankeys;
1445         IndexScanDesc toastscan;
1446         HeapTuple       ttup;
1447         TupleDesc       toasttupDesc;
1448         struct varlena *result;
1449         struct varatt_external toast_pointer;
1450         int32           attrsize;
1451         int32           residx;
1452         int32           nextidx;
1453         int                     numchunks;
1454         int                     startchunk;
1455         int                     endchunk;
1456         int32           startoffset;
1457         int32           endoffset;
1458         int                     totalchunks;
1459         Pointer         chunk;
1460         bool            isnull;
1461         char       *chunkdata;
1462         int32           chunksize;
1463         int32           chcpystrt;
1464         int32           chcpyend;
1465
1466         Assert(VARATT_IS_EXTERNAL(attr));
1467
1468         /* Must copy to access aligned fields */
1469         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1470
1471         /*
1472          * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
1473          * we can't return a compressed datum which is meaningful to toast later
1474          */
1475         Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1476
1477         attrsize = toast_pointer.va_extsize;
1478         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1479
1480         if (sliceoffset >= attrsize)
1481         {
1482                 sliceoffset = 0;
1483                 length = 0;
1484         }
1485
1486         if (((sliceoffset + length) > attrsize) || length < 0)
1487                 length = attrsize - sliceoffset;
1488
1489         result = (struct varlena *) palloc(length + VARHDRSZ);
1490
1491         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1492                 SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
1493         else
1494                 SET_VARSIZE(result, length + VARHDRSZ);
1495
1496         if (length == 0)
1497                 return result;                  /* Can save a lot of work at this point! */
1498
1499         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1500         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1501         numchunks = (endchunk - startchunk) + 1;
1502
1503         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1504         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1505
1506         /*
1507          * Open the toast relation and its index
1508          */
1509         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1510         toasttupDesc = toastrel->rd_att;
1511         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1512
1513         /*
1514          * Setup a scan key to fetch from the index. This is either two keys or
1515          * three depending on the number of chunks.
1516          */
1517         ScanKeyInit(&toastkey[0],
1518                                 (AttrNumber) 1,
1519                                 BTEqualStrategyNumber, F_OIDEQ,
1520                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1521
1522         /*
1523          * Use equality condition for one chunk, a range condition otherwise:
1524          */
1525         if (numchunks == 1)
1526         {
1527                 ScanKeyInit(&toastkey[1],
1528                                         (AttrNumber) 2,
1529                                         BTEqualStrategyNumber, F_INT4EQ,
1530                                         Int32GetDatum(startchunk));
1531                 nscankeys = 2;
1532         }
1533         else
1534         {
1535                 ScanKeyInit(&toastkey[1],
1536                                         (AttrNumber) 2,
1537                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1538                                         Int32GetDatum(startchunk));
1539                 ScanKeyInit(&toastkey[2],
1540                                         (AttrNumber) 2,
1541                                         BTLessEqualStrategyNumber, F_INT4LE,
1542                                         Int32GetDatum(endchunk));
1543                 nscankeys = 3;
1544         }
1545
1546         /*
1547          * Read the chunks by index
1548          *
1549          * The index is on (valueid, chunkidx) so they will come in order
1550          */
1551         nextidx = startchunk;
1552         toastscan = index_beginscan(toastrel, toastidx,
1553                                                                 SnapshotToast, nscankeys, toastkey);
1554         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1555         {
1556                 /*
1557                  * Have a chunk, extract the sequence number and the data
1558                  */
1559                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1560                 Assert(!isnull);
1561                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1562                 Assert(!isnull);
1563                 if (!VARATT_IS_EXTENDED(chunk))
1564                 {
1565                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1566                         chunkdata = VARDATA(chunk);
1567                 }
1568                 else if (VARATT_IS_SHORT(chunk))
1569                 {
1570                         /* could happen due to heap_form_tuple doing its thing */
1571                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1572                         chunkdata = VARDATA_SHORT(chunk);
1573                 }
1574                 else
1575                 {
1576                         /* should never happen */
1577                         elog(ERROR, "found toasted toast chunk");
1578                         chunksize = 0;          /* keep compiler quiet */
1579                         chunkdata = NULL;
1580                 }
1581
1582                 /*
1583                  * Some checks on the data we've found
1584                  */
1585                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1586                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1587                                  residx, nextidx,
1588                                  toast_pointer.va_valueid);
1589                 if (residx < totalchunks - 1)
1590                 {
1591                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1592                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u when fetching slice",
1593                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1594                                          residx, totalchunks,
1595                                          toast_pointer.va_valueid);
1596                 }
1597                 else if (residx == totalchunks - 1)
1598                 {
1599                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1600                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u when fetching slice",
1601                                          chunksize,
1602                                          (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
1603                                          residx,
1604                                          toast_pointer.va_valueid);
1605                 }
1606                 else
1607                         elog(ERROR, "unexpected chunk number %d for toast value %u (out of range %d..%d)",
1608                                  residx,
1609                                  toast_pointer.va_valueid,
1610                                  0, totalchunks - 1);
1611
1612                 /*
1613                  * Copy the data into proper place in our result
1614                  */
1615                 chcpystrt = 0;
1616                 chcpyend = chunksize - 1;
1617                 if (residx == startchunk)
1618                         chcpystrt = startoffset;
1619                 if (residx == endchunk)
1620                         chcpyend = endoffset;
1621
1622                 memcpy(VARDATA(result) +
1623                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1624                            chunkdata + chcpystrt,
1625                            (chcpyend - chcpystrt) + 1);
1626
1627                 nextidx++;
1628         }
1629
1630         /*
1631          * Final checks that we successfully fetched the datum
1632          */
1633         if (nextidx != (endchunk + 1))
1634                 elog(ERROR, "missing chunk number %d for toast value %u",
1635                          nextidx,
1636                          toast_pointer.va_valueid);
1637
1638         /*
1639          * End scan and close relations
1640          */
1641         index_endscan(toastscan);
1642         index_close(toastidx, AccessShareLock);
1643         heap_close(toastrel, AccessShareLock);
1644
1645         return result;
1646 }