]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Change the declaration of struct varlena so that the length word is
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2008, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.82 2008/02/23 19:11:45 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "utils/fmgroids.h"
39 #include "utils/pg_lzcompress.h"
40 #include "utils/typcache.h"
41
42
43 #undef TOAST_DEBUG
44
45 /* Size of an EXTERNAL datum that contains a standard TOAST pointer */
46 #define TOAST_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_external))
47
48 /*
49  * Testing whether an externally-stored value is compressed now requires
50  * comparing extsize (the actual length of the external data) to rawsize
51  * (the original uncompressed datum's size).  The latter includes VARHDRSZ
52  * overhead, the former doesn't.  We never use compression unless it actually
53  * saves space, so we expect either equality or less-than.
54  */
55 #define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \
56         ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ)
57
58 /*
59  * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum
60  * into a local "struct varatt_external" toast pointer.  This should be
61  * just a memcpy, but some versions of gcc seem to produce broken code
62  * that assumes the datum contents are aligned.  Introducing an explicit
63  * intermediate "varattrib_1b_e *" variable seems to fix it.
64  */
65 #define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \
66 do { \
67         varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \
68         Assert(VARATT_IS_EXTERNAL(attre)); \
69         Assert(VARSIZE_EXTERNAL(attre) == sizeof(toast_pointer) + VARHDRSZ_EXTERNAL); \
70         memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
71 } while (0)
72
73
74 static void toast_delete_datum(Relation rel, Datum value);
75 static Datum toast_save_datum(Relation rel, Datum value,
76                                  bool use_wal, bool use_fsm);
77 static struct varlena *toast_fetch_datum(struct varlena * attr);
78 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
79                                                 int32 sliceoffset, int32 length);
80
81
82 /* ----------
83  * heap_tuple_fetch_attr -
84  *
85  *      Public entry point to get back a toasted value from
86  *      external storage (possibly still in compressed format).
87  *
88  * This will return a datum that contains all the data internally, ie, not
89  * relying on external storage, but it can still be compressed or have a short
90  * header.
91  ----------
92  */
93 struct varlena *
94 heap_tuple_fetch_attr(struct varlena * attr)
95 {
96         struct varlena *result;
97
98         if (VARATT_IS_EXTERNAL(attr))
99         {
100                 /*
101                  * This is an external stored plain value
102                  */
103                 result = toast_fetch_datum(attr);
104         }
105         else
106         {
107                 /*
108                  * This is a plain value inside of the main tuple - why am I called?
109                  */
110                 result = attr;
111         }
112
113         return result;
114 }
115
116
117 /* ----------
118  * heap_tuple_untoast_attr -
119  *
120  *      Public entry point to get back a toasted value from compression
121  *      or external storage.
122  * ----------
123  */
124 struct varlena *
125 heap_tuple_untoast_attr(struct varlena * attr)
126 {
127         if (VARATT_IS_EXTERNAL(attr))
128         {
129                 /*
130                  * This is an externally stored datum --- fetch it back from there
131                  */
132                 attr = toast_fetch_datum(attr);
133                 /* If it's compressed, decompress it */
134                 if (VARATT_IS_COMPRESSED(attr))
135                 {
136                         PGLZ_Header *tmp = (PGLZ_Header *) attr;
137
138                         attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
139                         SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
140                         pglz_decompress(tmp, VARDATA(attr));
141                         pfree(tmp);
142                 }
143         }
144         else if (VARATT_IS_COMPRESSED(attr))
145         {
146                 /*
147                  * This is a compressed value inside of the main tuple
148                  */
149                 PGLZ_Header *tmp = (PGLZ_Header *) attr;
150
151                 attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
152                 SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
153                 pglz_decompress(tmp, VARDATA(attr));
154         }
155         else if (VARATT_IS_SHORT(attr))
156         {
157                 /*
158                  * This is a short-header varlena --- convert to 4-byte header format
159                  */
160                 Size            data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
161                 Size            new_size = data_size + VARHDRSZ;
162                 struct varlena *new_attr;
163
164                 new_attr = (struct varlena *) palloc(new_size);
165                 SET_VARSIZE(new_attr, new_size);
166                 memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
167                 attr = new_attr;
168         }
169
170         return attr;
171 }
172
173
174 /* ----------
175  * heap_tuple_untoast_attr_slice -
176  *
177  *              Public entry point to get back part of a toasted value
178  *              from compression or external storage.
179  * ----------
180  */
181 struct varlena *
182 heap_tuple_untoast_attr_slice(struct varlena * attr,
183                                                           int32 sliceoffset, int32 slicelength)
184 {
185         struct varlena *preslice;
186         struct varlena *result;
187         char       *attrdata;
188         int32           attrsize;
189
190         if (VARATT_IS_EXTERNAL(attr))
191         {
192                 struct varatt_external toast_pointer;
193
194                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
195
196                 /* fast path for non-compressed external datums */
197                 if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
198                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
199
200                 /* fetch it back (compressed marker will get set automatically) */
201                 preslice = toast_fetch_datum(attr);
202         }
203         else
204                 preslice = attr;
205
206         if (VARATT_IS_COMPRESSED(preslice))
207         {
208                 PGLZ_Header *tmp = (PGLZ_Header *) preslice;
209                 Size            size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
210
211                 preslice = (struct varlena *) palloc(size);
212                 SET_VARSIZE(preslice, size);
213                 pglz_decompress(tmp, VARDATA(preslice));
214
215                 if (tmp != (PGLZ_Header *) attr)
216                         pfree(tmp);
217         }
218
219         if (VARATT_IS_SHORT(preslice))
220         {
221                 attrdata = VARDATA_SHORT(preslice);
222                 attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
223         }
224         else
225         {
226                 attrdata = VARDATA(preslice);
227                 attrsize = VARSIZE(preslice) - VARHDRSZ;
228         }
229
230         /* slicing of datum for compressed cases and plain value */
231
232         if (sliceoffset >= attrsize)
233         {
234                 sliceoffset = 0;
235                 slicelength = 0;
236         }
237
238         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
239                 slicelength = attrsize - sliceoffset;
240
241         result = (struct varlena *) palloc(slicelength + VARHDRSZ);
242         SET_VARSIZE(result, slicelength + VARHDRSZ);
243
244         memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
245
246         if (preslice != attr)
247                 pfree(preslice);
248
249         return result;
250 }
251
252
253 /* ----------
254  * toast_raw_datum_size -
255  *
256  *      Return the raw (detoasted) size of a varlena datum
257  *      (including the VARHDRSZ header)
258  * ----------
259  */
260 Size
261 toast_raw_datum_size(Datum value)
262 {
263         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
264         Size            result;
265
266         if (VARATT_IS_EXTERNAL(attr))
267         {
268                 /* va_rawsize is the size of the original datum -- including header */
269                 struct varatt_external toast_pointer;
270
271                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
272                 result = toast_pointer.va_rawsize;
273         }
274         else if (VARATT_IS_COMPRESSED(attr))
275         {
276                 /* here, va_rawsize is just the payload size */
277                 result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
278         }
279         else if (VARATT_IS_SHORT(attr))
280         {
281                 /*
282                  * we have to normalize the header length to VARHDRSZ or else the
283                  * callers of this function will be confused.
284                  */
285                 result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
286         }
287         else
288         {
289                 /* plain untoasted datum */
290                 result = VARSIZE(attr);
291         }
292         return result;
293 }
294
295 /* ----------
296  * toast_datum_size
297  *
298  *      Return the physical storage size (possibly compressed) of a varlena datum
299  * ----------
300  */
301 Size
302 toast_datum_size(Datum value)
303 {
304         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
305         Size            result;
306
307         if (VARATT_IS_EXTERNAL(attr))
308         {
309                 /*
310                  * Attribute is stored externally - return the extsize whether
311                  * compressed or not.  We do not count the size of the toast pointer
312                  * ... should we?
313                  */
314                 struct varatt_external toast_pointer;
315
316                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
317                 result = toast_pointer.va_extsize;
318         }
319         else if (VARATT_IS_SHORT(attr))
320         {
321                 result = VARSIZE_SHORT(attr);
322         }
323         else
324         {
325                 /*
326                  * Attribute is stored inline either compressed or not, just calculate
327                  * the size of the datum in either case.
328                  */
329                 result = VARSIZE(attr);
330         }
331         return result;
332 }
333
334
335 /* ----------
336  * toast_delete -
337  *
338  *      Cascaded delete toast-entries on DELETE
339  * ----------
340  */
341 void
342 toast_delete(Relation rel, HeapTuple oldtup)
343 {
344         TupleDesc       tupleDesc;
345         Form_pg_attribute *att;
346         int                     numAttrs;
347         int                     i;
348         Datum           toast_values[MaxHeapAttributeNumber];
349         bool            toast_isnull[MaxHeapAttributeNumber];
350
351         /*
352          * We should only ever be called for tuples of plain relations ---
353          * recursing on a toast rel is bad news.
354          */
355         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
356
357         /*
358          * Get the tuple descriptor and break down the tuple into fields.
359          *
360          * NOTE: it's debatable whether to use heap_deformtuple() here or just
361          * heap_getattr() only the varlena columns.  The latter could win if there
362          * are few varlena columns and many non-varlena ones. However,
363          * heap_deformtuple costs only O(N) while the heap_getattr way would cost
364          * O(N^2) if there are many varlena columns, so it seems better to err on
365          * the side of linear cost.  (We won't even be here unless there's at
366          * least one varlena column, by the way.)
367          */
368         tupleDesc = rel->rd_att;
369         att = tupleDesc->attrs;
370         numAttrs = tupleDesc->natts;
371
372         Assert(numAttrs <= MaxHeapAttributeNumber);
373         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
374
375         /*
376          * Check for external stored attributes and delete them from the secondary
377          * relation.
378          */
379         for (i = 0; i < numAttrs; i++)
380         {
381                 if (att[i]->attlen == -1)
382                 {
383                         Datum           value = toast_values[i];
384
385                         if (!toast_isnull[i] && VARATT_IS_EXTERNAL(value))
386                                 toast_delete_datum(rel, value);
387                 }
388         }
389 }
390
391
392 /* ----------
393  * toast_insert_or_update -
394  *
395  *      Delete no-longer-used toast-entries and create new ones to
396  *      make the new tuple fit on INSERT or UPDATE
397  *
398  * Inputs:
399  *      newtup: the candidate new tuple to be inserted
400  *      oldtup: the old row version for UPDATE, or NULL for INSERT
401  *      use_wal, use_fsm: flags to be passed to heap_insert() for toast rows
402  * Result:
403  *      either newtup if no toasting is needed, or a palloc'd modified tuple
404  *      that is what should actually get stored
405  *
406  * NOTE: neither newtup nor oldtup will be modified.  This is a change
407  * from the pre-8.1 API of this routine.
408  * ----------
409  */
410 HeapTuple
411 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
412                                            bool use_wal, bool use_fsm)
413 {
414         HeapTuple       result_tuple;
415         TupleDesc       tupleDesc;
416         Form_pg_attribute *att;
417         int                     numAttrs;
418         int                     i;
419
420         bool            need_change = false;
421         bool            need_free = false;
422         bool            need_delold = false;
423         bool            has_nulls = false;
424
425         Size            maxDataLen;
426         Size            hoff;
427
428         char            toast_action[MaxHeapAttributeNumber];
429         bool            toast_isnull[MaxHeapAttributeNumber];
430         bool            toast_oldisnull[MaxHeapAttributeNumber];
431         Datum           toast_values[MaxHeapAttributeNumber];
432         Datum           toast_oldvalues[MaxHeapAttributeNumber];
433         int32           toast_sizes[MaxHeapAttributeNumber];
434         bool            toast_free[MaxHeapAttributeNumber];
435         bool            toast_delold[MaxHeapAttributeNumber];
436
437         /*
438          * We should only ever be called for tuples of plain relations ---
439          * recursing on a toast rel is bad news.
440          */
441         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
442
443         /*
444          * Get the tuple descriptor and break down the tuple(s) into fields.
445          */
446         tupleDesc = rel->rd_att;
447         att = tupleDesc->attrs;
448         numAttrs = tupleDesc->natts;
449
450         Assert(numAttrs <= MaxHeapAttributeNumber);
451         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
452         if (oldtup != NULL)
453                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
454
455         /* ----------
456          * Then collect information about the values given
457          *
458          * NOTE: toast_action[i] can have these values:
459          *              ' '             default handling
460          *              'p'             already processed --- don't touch it
461          *              'x'             incompressible, but OK to move off
462          *
463          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
464          *              toast_action[i] different from 'p'.
465          * ----------
466          */
467         memset(toast_action, ' ', numAttrs * sizeof(char));
468         memset(toast_free, 0, numAttrs * sizeof(bool));
469         memset(toast_delold, 0, numAttrs * sizeof(bool));
470
471         for (i = 0; i < numAttrs; i++)
472         {
473                 struct varlena *old_value;
474                 struct varlena *new_value;
475
476                 if (oldtup != NULL)
477                 {
478                         /*
479                          * For UPDATE get the old and new values of this attribute
480                          */
481                         old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
482                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
483
484                         /*
485                          * If the old value is an external stored one, check if it has
486                          * changed so we have to delete it later.
487                          */
488                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
489                                 VARATT_IS_EXTERNAL(old_value))
490                         {
491                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
492                                         memcmp((char *) old_value, (char *) new_value,
493                                                    VARSIZE_EXTERNAL(old_value)) != 0)
494                                 {
495                                         /*
496                                          * The old external stored value isn't needed any more
497                                          * after the update
498                                          */
499                                         toast_delold[i] = true;
500                                         need_delold = true;
501                                 }
502                                 else
503                                 {
504                                         /*
505                                          * This attribute isn't changed by this update so we reuse
506                                          * the original reference to the old value in the new
507                                          * tuple.
508                                          */
509                                         toast_action[i] = 'p';
510                                         continue;
511                                 }
512                         }
513                 }
514                 else
515                 {
516                         /*
517                          * For INSERT simply get the new value
518                          */
519                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
520                 }
521
522                 /*
523                  * Handle NULL attributes
524                  */
525                 if (toast_isnull[i])
526                 {
527                         toast_action[i] = 'p';
528                         has_nulls = true;
529                         continue;
530                 }
531
532                 /*
533                  * Now look at varlena attributes
534                  */
535                 if (att[i]->attlen == -1)
536                 {
537                         /*
538                          * If the table's attribute says PLAIN always, force it so.
539                          */
540                         if (att[i]->attstorage == 'p')
541                                 toast_action[i] = 'p';
542
543                         /*
544                          * We took care of UPDATE above, so any external value we find
545                          * still in the tuple must be someone else's we cannot reuse.
546                          * Fetch it back (without decompression, unless we are forcing
547                          * PLAIN storage).      If necessary, we'll push it out as a new
548                          * external value below.
549                          */
550                         if (VARATT_IS_EXTERNAL(new_value))
551                         {
552                                 if (att[i]->attstorage == 'p')
553                                         new_value = heap_tuple_untoast_attr(new_value);
554                                 else
555                                         new_value = heap_tuple_fetch_attr(new_value);
556                                 toast_values[i] = PointerGetDatum(new_value);
557                                 toast_free[i] = true;
558                                 need_change = true;
559                                 need_free = true;
560                         }
561
562                         /*
563                          * Remember the size of this attribute
564                          */
565                         toast_sizes[i] = VARSIZE_ANY(new_value);
566                 }
567                 else
568                 {
569                         /*
570                          * Not a varlena attribute, plain storage always
571                          */
572                         toast_action[i] = 'p';
573                 }
574         }
575
576         /* ----------
577          * Compress and/or save external until data fits into target length
578          *
579          *      1: Inline compress attributes with attstorage 'x'
580          *      2: Store attributes with attstorage 'x' or 'e' external
581          *      3: Inline compress attributes with attstorage 'm'
582          *      4: Store attributes with attstorage 'm' external
583          * ----------
584          */
585
586         /* compute header overhead --- this should match heap_form_tuple() */
587         hoff = offsetof(HeapTupleHeaderData, t_bits);
588         if (has_nulls)
589                 hoff += BITMAPLEN(numAttrs);
590         if (newtup->t_data->t_infomask & HEAP_HASOID)
591                 hoff += sizeof(Oid);
592         hoff = MAXALIGN(hoff);
593         Assert(hoff == newtup->t_data->t_hoff);
594         /* now convert to a limit on the tuple data size */
595         maxDataLen = TOAST_TUPLE_TARGET - hoff;
596
597         /*
598          * Look for attributes with attstorage 'x' to compress
599          */
600         while (heap_compute_data_size(tupleDesc,
601                                                                   toast_values, toast_isnull) > maxDataLen)
602         {
603                 int                     biggest_attno = -1;
604                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
605                 Datum           old_value;
606                 Datum           new_value;
607
608                 /*
609                  * Search for the biggest yet uncompressed internal attribute
610                  */
611                 for (i = 0; i < numAttrs; i++)
612                 {
613                         if (toast_action[i] != ' ')
614                                 continue;
615                         if (VARATT_IS_EXTERNAL(toast_values[i]))
616                                 continue;               /* can't happen, toast_action would be 'p' */
617                         if (VARATT_IS_COMPRESSED(toast_values[i]))
618                                 continue;
619                         if (att[i]->attstorage != 'x')
620                                 continue;
621                         if (toast_sizes[i] > biggest_size)
622                         {
623                                 biggest_attno = i;
624                                 biggest_size = toast_sizes[i];
625                         }
626                 }
627
628                 if (biggest_attno < 0)
629                         break;
630
631                 /*
632                  * Attempt to compress it inline
633                  */
634                 i = biggest_attno;
635                 old_value = toast_values[i];
636                 new_value = toast_compress_datum(old_value);
637
638                 if (DatumGetPointer(new_value) != NULL)
639                 {
640                         /* successful compression */
641                         if (toast_free[i])
642                                 pfree(DatumGetPointer(old_value));
643                         toast_values[i] = new_value;
644                         toast_free[i] = true;
645                         toast_sizes[i] = VARSIZE(toast_values[i]);
646                         need_change = true;
647                         need_free = true;
648                 }
649                 else
650                 {
651                         /*
652                          * incompressible data, ignore on subsequent compression passes
653                          */
654                         toast_action[i] = 'x';
655                 }
656         }
657
658         /*
659          * Second we look for attributes of attstorage 'x' or 'e' that are still
660          * inline.      But skip this if there's no toast table to push them to.
661          */
662         while (heap_compute_data_size(tupleDesc,
663                                                                   toast_values, toast_isnull) > maxDataLen &&
664                    rel->rd_rel->reltoastrelid != InvalidOid)
665         {
666                 int                     biggest_attno = -1;
667                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
668                 Datum           old_value;
669
670                 /*------
671                  * Search for the biggest yet inlined attribute with
672                  * attstorage equals 'x' or 'e'
673                  *------
674                  */
675                 for (i = 0; i < numAttrs; i++)
676                 {
677                         if (toast_action[i] == 'p')
678                                 continue;
679                         if (VARATT_IS_EXTERNAL(toast_values[i]))
680                                 continue;               /* can't happen, toast_action would be 'p' */
681                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
682                                 continue;
683                         if (toast_sizes[i] > biggest_size)
684                         {
685                                 biggest_attno = i;
686                                 biggest_size = toast_sizes[i];
687                         }
688                 }
689
690                 if (biggest_attno < 0)
691                         break;
692
693                 /*
694                  * Store this external
695                  */
696                 i = biggest_attno;
697                 old_value = toast_values[i];
698                 toast_action[i] = 'p';
699                 toast_values[i] = toast_save_datum(rel, toast_values[i],
700                                                                                    use_wal, use_fsm);
701                 if (toast_free[i])
702                         pfree(DatumGetPointer(old_value));
703                 toast_free[i] = true;
704
705                 need_change = true;
706                 need_free = true;
707         }
708
709         /*
710          * Round 3 - this time we take attributes with storage 'm' into
711          * compression
712          */
713         while (heap_compute_data_size(tupleDesc,
714                                                                   toast_values, toast_isnull) > maxDataLen)
715         {
716                 int                     biggest_attno = -1;
717                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
718                 Datum           old_value;
719                 Datum           new_value;
720
721                 /*
722                  * Search for the biggest yet uncompressed internal attribute
723                  */
724                 for (i = 0; i < numAttrs; i++)
725                 {
726                         if (toast_action[i] != ' ')
727                                 continue;
728                         if (VARATT_IS_EXTERNAL(toast_values[i]))
729                                 continue;               /* can't happen, toast_action would be 'p' */
730                         if (VARATT_IS_COMPRESSED(toast_values[i]))
731                                 continue;
732                         if (att[i]->attstorage != 'm')
733                                 continue;
734                         if (toast_sizes[i] > biggest_size)
735                         {
736                                 biggest_attno = i;
737                                 biggest_size = toast_sizes[i];
738                         }
739                 }
740
741                 if (biggest_attno < 0)
742                         break;
743
744                 /*
745                  * Attempt to compress it inline
746                  */
747                 i = biggest_attno;
748                 old_value = toast_values[i];
749                 new_value = toast_compress_datum(old_value);
750
751                 if (DatumGetPointer(new_value) != NULL)
752                 {
753                         /* successful compression */
754                         if (toast_free[i])
755                                 pfree(DatumGetPointer(old_value));
756                         toast_values[i] = new_value;
757                         toast_free[i] = true;
758                         toast_sizes[i] = VARSIZE(toast_values[i]);
759                         need_change = true;
760                         need_free = true;
761                 }
762                 else
763                 {
764                         /*
765                          * incompressible data, ignore on subsequent compression passes
766                          */
767                         toast_action[i] = 'x';
768                 }
769         }
770
771         /*
772          * Finally we store attributes of type 'm' external, if possible.
773          */
774         while (heap_compute_data_size(tupleDesc,
775                                                                   toast_values, toast_isnull) > maxDataLen &&
776                    rel->rd_rel->reltoastrelid != InvalidOid)
777         {
778                 int                     biggest_attno = -1;
779                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
780                 Datum           old_value;
781
782                 /*--------
783                  * Search for the biggest yet inlined attribute with
784                  * attstorage = 'm'
785                  *--------
786                  */
787                 for (i = 0; i < numAttrs; i++)
788                 {
789                         if (toast_action[i] == 'p')
790                                 continue;
791                         if (VARATT_IS_EXTERNAL(toast_values[i]))
792                                 continue;               /* can't happen, toast_action would be 'p' */
793                         if (att[i]->attstorage != 'm')
794                                 continue;
795                         if (toast_sizes[i] > biggest_size)
796                         {
797                                 biggest_attno = i;
798                                 biggest_size = toast_sizes[i];
799                         }
800                 }
801
802                 if (biggest_attno < 0)
803                         break;
804
805                 /*
806                  * Store this external
807                  */
808                 i = biggest_attno;
809                 old_value = toast_values[i];
810                 toast_action[i] = 'p';
811                 toast_values[i] = toast_save_datum(rel, toast_values[i],
812                                                                                    use_wal, use_fsm);
813                 if (toast_free[i])
814                         pfree(DatumGetPointer(old_value));
815                 toast_free[i] = true;
816
817                 need_change = true;
818                 need_free = true;
819         }
820
821         /*
822          * In the case we toasted any values, we need to build a new heap tuple
823          * with the changed values.
824          */
825         if (need_change)
826         {
827                 HeapTupleHeader olddata = newtup->t_data;
828                 HeapTupleHeader new_data;
829                 int32           new_len;
830                 int32           new_data_len;
831
832                 /*
833                  * Calculate the new size of the tuple.  Header size should not
834                  * change, but data size might.
835                  */
836                 new_len = offsetof(HeapTupleHeaderData, t_bits);
837                 if (has_nulls)
838                         new_len += BITMAPLEN(numAttrs);
839                 if (olddata->t_infomask & HEAP_HASOID)
840                         new_len += sizeof(Oid);
841                 new_len = MAXALIGN(new_len);
842                 Assert(new_len == olddata->t_hoff);
843                 new_data_len = heap_compute_data_size(tupleDesc,
844                                                                                           toast_values, toast_isnull);
845                 new_len += new_data_len;
846
847                 /*
848                  * Allocate and zero the space needed, and fill HeapTupleData fields.
849                  */
850                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
851                 result_tuple->t_len = new_len;
852                 result_tuple->t_self = newtup->t_self;
853                 result_tuple->t_tableOid = newtup->t_tableOid;
854                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
855                 result_tuple->t_data = new_data;
856
857                 /*
858                  * Put the existing tuple header and the changed values into place
859                  */
860                 memcpy(new_data, olddata, olddata->t_hoff);
861
862                 heap_fill_tuple(tupleDesc,
863                                                 toast_values,
864                                                 toast_isnull,
865                                                 (char *) new_data + olddata->t_hoff,
866                                                 new_data_len,
867                                                 &(new_data->t_infomask),
868                                                 has_nulls ? new_data->t_bits : NULL);
869         }
870         else
871                 result_tuple = newtup;
872
873         /*
874          * Free allocated temp values
875          */
876         if (need_free)
877                 for (i = 0; i < numAttrs; i++)
878                         if (toast_free[i])
879                                 pfree(DatumGetPointer(toast_values[i]));
880
881         /*
882          * Delete external values from the old tuple
883          */
884         if (need_delold)
885                 for (i = 0; i < numAttrs; i++)
886                         if (toast_delold[i])
887                                 toast_delete_datum(rel, toast_oldvalues[i]);
888
889         return result_tuple;
890 }
891
892
893 /* ----------
894  * toast_flatten_tuple_attribute -
895  *
896  *      If a Datum is of composite type, "flatten" it to contain no toasted fields.
897  *      This must be invoked on any potentially-composite field that is to be
898  *      inserted into a tuple.  Doing this preserves the invariant that toasting
899  *      goes only one level deep in a tuple.
900  *
901  *      Note that flattening does not mean expansion of short-header varlenas,
902  *      so in one sense toasting is allowed within composite datums.
903  * ----------
904  */
905 Datum
906 toast_flatten_tuple_attribute(Datum value,
907                                                           Oid typeId, int32 typeMod)
908 {
909         TupleDesc       tupleDesc;
910         HeapTupleHeader olddata;
911         HeapTupleHeader new_data;
912         int32           new_len;
913         int32           new_data_len;
914         HeapTupleData tmptup;
915         Form_pg_attribute *att;
916         int                     numAttrs;
917         int                     i;
918         bool            need_change = false;
919         bool            has_nulls = false;
920         Datum           toast_values[MaxTupleAttributeNumber];
921         bool            toast_isnull[MaxTupleAttributeNumber];
922         bool            toast_free[MaxTupleAttributeNumber];
923
924         /*
925          * See if it's a composite type, and get the tupdesc if so.
926          */
927         tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
928         if (tupleDesc == NULL)
929                 return value;                   /* not a composite type */
930
931         att = tupleDesc->attrs;
932         numAttrs = tupleDesc->natts;
933
934         /*
935          * Break down the tuple into fields.
936          */
937         olddata = DatumGetHeapTupleHeader(value);
938         Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
939         Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
940         /* Build a temporary HeapTuple control structure */
941         tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
942         ItemPointerSetInvalid(&(tmptup.t_self));
943         tmptup.t_tableOid = InvalidOid;
944         tmptup.t_data = olddata;
945
946         Assert(numAttrs <= MaxTupleAttributeNumber);
947         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
948
949         memset(toast_free, 0, numAttrs * sizeof(bool));
950
951         for (i = 0; i < numAttrs; i++)
952         {
953                 /*
954                  * Look at non-null varlena attributes
955                  */
956                 if (toast_isnull[i])
957                         has_nulls = true;
958                 else if (att[i]->attlen == -1)
959                 {
960                         struct varlena *new_value;
961
962                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
963                         if (VARATT_IS_EXTERNAL(new_value) ||
964                                 VARATT_IS_COMPRESSED(new_value))
965                         {
966                                 new_value = heap_tuple_untoast_attr(new_value);
967                                 toast_values[i] = PointerGetDatum(new_value);
968                                 toast_free[i] = true;
969                                 need_change = true;
970                         }
971                 }
972         }
973
974         /*
975          * If nothing to untoast, just return the original tuple.
976          */
977         if (!need_change)
978         {
979                 ReleaseTupleDesc(tupleDesc);
980                 return value;
981         }
982
983         /*
984          * Calculate the new size of the tuple.  Header size should not change,
985          * but data size might.
986          */
987         new_len = offsetof(HeapTupleHeaderData, t_bits);
988         if (has_nulls)
989                 new_len += BITMAPLEN(numAttrs);
990         if (olddata->t_infomask & HEAP_HASOID)
991                 new_len += sizeof(Oid);
992         new_len = MAXALIGN(new_len);
993         Assert(new_len == olddata->t_hoff);
994         new_data_len = heap_compute_data_size(tupleDesc,
995                                                                                   toast_values, toast_isnull);
996         new_len += new_data_len;
997
998         new_data = (HeapTupleHeader) palloc0(new_len);
999
1000         /*
1001          * Put the tuple header and the changed values into place
1002          */
1003         memcpy(new_data, olddata, olddata->t_hoff);
1004
1005         HeapTupleHeaderSetDatumLength(new_data, new_len);
1006
1007         heap_fill_tuple(tupleDesc,
1008                                         toast_values,
1009                                         toast_isnull,
1010                                         (char *) new_data + olddata->t_hoff,
1011                                         new_data_len,
1012                                         &(new_data->t_infomask),
1013                                         has_nulls ? new_data->t_bits : NULL);
1014
1015         /*
1016          * Free allocated temp values
1017          */
1018         for (i = 0; i < numAttrs; i++)
1019                 if (toast_free[i])
1020                         pfree(DatumGetPointer(toast_values[i]));
1021         ReleaseTupleDesc(tupleDesc);
1022
1023         return PointerGetDatum(new_data);
1024 }
1025
1026
1027 /* ----------
1028  * toast_compress_datum -
1029  *
1030  *      Create a compressed version of a varlena datum
1031  *
1032  *      If we fail (ie, compressed result is actually bigger than original)
1033  *      then return NULL.  We must not use compressed data if it'd expand
1034  *      the tuple!
1035  *
1036  *      We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1037  *      copying them.  But we can't handle external or compressed datums.
1038  * ----------
1039  */
1040 Datum
1041 toast_compress_datum(Datum value)
1042 {
1043         struct varlena *tmp;
1044         int32           valsize = VARSIZE_ANY_EXHDR(value);
1045
1046         Assert(!VARATT_IS_EXTERNAL(value));
1047         Assert(!VARATT_IS_COMPRESSED(value));
1048
1049         /*
1050          * No point in wasting a palloc cycle if value is too short for
1051          * compression
1052          */
1053         if (valsize < PGLZ_strategy_default->min_input_size)
1054                 return PointerGetDatum(NULL);
1055
1056         tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
1057         if (pglz_compress(VARDATA_ANY(value), valsize,
1058                                           (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
1059                 VARSIZE(tmp) < VARSIZE_ANY(value))
1060         {
1061                 /* successful compression */
1062                 return PointerGetDatum(tmp);
1063         }
1064         else
1065         {
1066                 /* incompressible data */
1067                 pfree(tmp);
1068                 return PointerGetDatum(NULL);
1069         }
1070 }
1071
1072
1073 /* ----------
1074  * toast_save_datum -
1075  *
1076  *      Save one single datum into the secondary relation and return
1077  *      a Datum reference for it.
1078  * ----------
1079  */
1080 static Datum
1081 toast_save_datum(Relation rel, Datum value,
1082                                  bool use_wal, bool use_fsm)
1083 {
1084         Relation        toastrel;
1085         Relation        toastidx;
1086         HeapTuple       toasttup;
1087         TupleDesc       toasttupDesc;
1088         Datum           t_values[3];
1089         bool            t_isnull[3];
1090         CommandId       mycid = GetCurrentCommandId(true);
1091         struct varlena *result;
1092         struct varatt_external toast_pointer;
1093         struct
1094         {
1095                 struct varlena hdr;
1096                 char            data[TOAST_MAX_CHUNK_SIZE];
1097         }                       chunk_data;
1098         int32           chunk_size;
1099         int32           chunk_seq = 0;
1100         char       *data_p;
1101         int32           data_todo;
1102
1103         /*
1104          * Open the toast relation and its index.  We can use the index to check
1105          * uniqueness of the OID we assign to the toasted item, even though it has
1106          * additional columns besides OID.
1107          */
1108         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1109         toasttupDesc = toastrel->rd_att;
1110         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1111
1112         /*
1113          * Get the data pointer and length, and compute va_rawsize and va_extsize.
1114          *
1115          * va_rawsize is the size of the equivalent fully uncompressed datum, so
1116          * we have to adjust for short headers.
1117          *
1118          * va_extsize is the actual size of the data payload in the toast records.
1119          */
1120         if (VARATT_IS_SHORT(value))
1121         {
1122                 data_p = VARDATA_SHORT(value);
1123                 data_todo = VARSIZE_SHORT(value) - VARHDRSZ_SHORT;
1124                 toast_pointer.va_rawsize = data_todo + VARHDRSZ;                /* as if not short */
1125                 toast_pointer.va_extsize = data_todo;
1126         }
1127         else if (VARATT_IS_COMPRESSED(value))
1128         {
1129                 data_p = VARDATA(value);
1130                 data_todo = VARSIZE(value) - VARHDRSZ;
1131                 /* rawsize in a compressed datum is just the size of the payload */
1132                 toast_pointer.va_rawsize = VARRAWSIZE_4B_C(value) + VARHDRSZ;
1133                 toast_pointer.va_extsize = data_todo;
1134                 /* Assert that the numbers look like it's compressed */
1135                 Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1136         }
1137         else
1138         {
1139                 data_p = VARDATA(value);
1140                 data_todo = VARSIZE(value) - VARHDRSZ;
1141                 toast_pointer.va_rawsize = VARSIZE(value);
1142                 toast_pointer.va_extsize = data_todo;
1143         }
1144
1145         toast_pointer.va_valueid = GetNewOidWithIndex(toastrel, toastidx);
1146         toast_pointer.va_toastrelid = rel->rd_rel->reltoastrelid;
1147
1148         /*
1149          * Initialize constant parts of the tuple data
1150          */
1151         t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1152         t_values[2] = PointerGetDatum(&chunk_data);
1153         t_isnull[0] = false;
1154         t_isnull[1] = false;
1155         t_isnull[2] = false;
1156
1157         /*
1158          * Split up the item into chunks
1159          */
1160         while (data_todo > 0)
1161         {
1162                 /*
1163                  * Calculate the size of this chunk
1164                  */
1165                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1166
1167                 /*
1168                  * Build a tuple and store it
1169                  */
1170                 t_values[1] = Int32GetDatum(chunk_seq++);
1171                 SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1172                 memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1173                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1174                 if (!HeapTupleIsValid(toasttup))
1175                         elog(ERROR, "failed to build TOAST tuple");
1176
1177                 heap_insert(toastrel, toasttup, mycid, use_wal, use_fsm);
1178
1179                 /*
1180                  * Create the index entry.      We cheat a little here by not using
1181                  * FormIndexDatum: this relies on the knowledge that the index columns
1182                  * are the same as the initial columns of the table.
1183                  *
1184                  * Note also that there had better not be any user-created index on
1185                  * the TOAST table, since we don't bother to update anything else.
1186                  */
1187                 index_insert(toastidx, t_values, t_isnull,
1188                                          &(toasttup->t_self),
1189                                          toastrel, toastidx->rd_index->indisunique);
1190
1191                 /*
1192                  * Free memory
1193                  */
1194                 heap_freetuple(toasttup);
1195
1196                 /*
1197                  * Move on to next chunk
1198                  */
1199                 data_todo -= chunk_size;
1200                 data_p += chunk_size;
1201         }
1202
1203         /*
1204          * Done - close toast relation
1205          */
1206         index_close(toastidx, RowExclusiveLock);
1207         heap_close(toastrel, RowExclusiveLock);
1208
1209         /*
1210          * Create the TOAST pointer value that we'll return
1211          */
1212         result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1213         SET_VARSIZE_EXTERNAL(result, TOAST_POINTER_SIZE);
1214         memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1215
1216         return PointerGetDatum(result);
1217 }
1218
1219
1220 /* ----------
1221  * toast_delete_datum -
1222  *
1223  *      Delete a single external stored value.
1224  * ----------
1225  */
1226 static void
1227 toast_delete_datum(Relation rel, Datum value)
1228 {
1229         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1230         struct varatt_external toast_pointer;
1231         Relation        toastrel;
1232         Relation        toastidx;
1233         ScanKeyData toastkey;
1234         IndexScanDesc toastscan;
1235         HeapTuple       toasttup;
1236
1237         if (!VARATT_IS_EXTERNAL(attr))
1238                 return;
1239
1240         /* Must copy to access aligned fields */
1241         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1242
1243         /*
1244          * Open the toast relation and its index
1245          */
1246         toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1247         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1248
1249         /*
1250          * Setup a scan key to fetch from the index by va_valueid (we don't
1251          * particularly care whether we see them in sequence or not)
1252          */
1253         ScanKeyInit(&toastkey,
1254                                 (AttrNumber) 1,
1255                                 BTEqualStrategyNumber, F_OIDEQ,
1256                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1257
1258         /*
1259          * Find the chunks by index
1260          */
1261         toastscan = index_beginscan(toastrel, toastidx,
1262                                                                 SnapshotToast, 1, &toastkey);
1263         while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1264         {
1265                 /*
1266                  * Have a chunk, delete it
1267                  */
1268                 simple_heap_delete(toastrel, &toasttup->t_self);
1269         }
1270
1271         /*
1272          * End scan and close relations
1273          */
1274         index_endscan(toastscan);
1275         index_close(toastidx, RowExclusiveLock);
1276         heap_close(toastrel, RowExclusiveLock);
1277 }
1278
1279
1280 /* ----------
1281  * toast_fetch_datum -
1282  *
1283  *      Reconstruct an in memory Datum from the chunks saved
1284  *      in the toast relation
1285  * ----------
1286  */
1287 static struct varlena *
1288 toast_fetch_datum(struct varlena * attr)
1289 {
1290         Relation        toastrel;
1291         Relation        toastidx;
1292         ScanKeyData toastkey;
1293         IndexScanDesc toastscan;
1294         HeapTuple       ttup;
1295         TupleDesc       toasttupDesc;
1296         struct varlena *result;
1297         struct varatt_external toast_pointer;
1298         int32           ressize;
1299         int32           residx,
1300                                 nextidx;
1301         int32           numchunks;
1302         Pointer         chunk;
1303         bool            isnull;
1304         char       *chunkdata;
1305         int32           chunksize;
1306
1307         /* Must copy to access aligned fields */
1308         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1309
1310         ressize = toast_pointer.va_extsize;
1311         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1312
1313         result = (struct varlena *) palloc(ressize + VARHDRSZ);
1314
1315         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1316                 SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1317         else
1318                 SET_VARSIZE(result, ressize + VARHDRSZ);
1319
1320         /*
1321          * Open the toast relation and its index
1322          */
1323         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1324         toasttupDesc = toastrel->rd_att;
1325         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1326
1327         /*
1328          * Setup a scan key to fetch from the index by va_valueid
1329          */
1330         ScanKeyInit(&toastkey,
1331                                 (AttrNumber) 1,
1332                                 BTEqualStrategyNumber, F_OIDEQ,
1333                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1334
1335         /*
1336          * Read the chunks by index
1337          *
1338          * Note that because the index is actually on (valueid, chunkidx) we will
1339          * see the chunks in chunkidx order, even though we didn't explicitly ask
1340          * for it.
1341          */
1342         nextidx = 0;
1343
1344         toastscan = index_beginscan(toastrel, toastidx,
1345                                                                 SnapshotToast, 1, &toastkey);
1346         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1347         {
1348                 /*
1349                  * Have a chunk, extract the sequence number and the data
1350                  */
1351                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1352                 Assert(!isnull);
1353                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1354                 Assert(!isnull);
1355                 if (!VARATT_IS_EXTENDED(chunk))
1356                 {
1357                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1358                         chunkdata = VARDATA(chunk);
1359                 }
1360                 else if (VARATT_IS_SHORT(chunk))
1361                 {
1362                         /* could happen due to heap_form_tuple doing its thing */
1363                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1364                         chunkdata = VARDATA_SHORT(chunk);
1365                 }
1366                 else
1367                 {
1368                         /* should never happen */
1369                         elog(ERROR, "found toasted toast chunk");
1370                         chunksize = 0;          /* keep compiler quiet */
1371                         chunkdata = NULL;
1372                 }
1373
1374                 /*
1375                  * Some checks on the data we've found
1376                  */
1377                 if (residx != nextidx)
1378                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1379                                  residx, nextidx,
1380                                  toast_pointer.va_valueid);
1381                 if (residx < numchunks - 1)
1382                 {
1383                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1384                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u",
1385                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1386                                          residx, numchunks,
1387                                          toast_pointer.va_valueid);
1388                 }
1389                 else if (residx == numchunks - 1)
1390                 {
1391                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1392                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u",
1393                                          chunksize,
1394                                          (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1395                                          residx,
1396                                          toast_pointer.va_valueid);
1397                 }
1398                 else
1399                         elog(ERROR, "unexpected chunk number %d for toast value %u (out of range %d..%d)",
1400                                  residx,
1401                                  toast_pointer.va_valueid,
1402                                  0, numchunks - 1);
1403
1404                 /*
1405                  * Copy the data into proper place in our result
1406                  */
1407                 memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1408                            chunkdata,
1409                            chunksize);
1410
1411                 nextidx++;
1412         }
1413
1414         /*
1415          * Final checks that we successfully fetched the datum
1416          */
1417         if (nextidx != numchunks)
1418                 elog(ERROR, "missing chunk number %d for toast value %u",
1419                          nextidx,
1420                          toast_pointer.va_valueid);
1421
1422         /*
1423          * End scan and close relations
1424          */
1425         index_endscan(toastscan);
1426         index_close(toastidx, AccessShareLock);
1427         heap_close(toastrel, AccessShareLock);
1428
1429         return result;
1430 }
1431
1432 /* ----------
1433  * toast_fetch_datum_slice -
1434  *
1435  *      Reconstruct a segment of a Datum from the chunks saved
1436  *      in the toast relation
1437  * ----------
1438  */
1439 static struct varlena *
1440 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1441 {
1442         Relation        toastrel;
1443         Relation        toastidx;
1444         ScanKeyData toastkey[3];
1445         int                     nscankeys;
1446         IndexScanDesc toastscan;
1447         HeapTuple       ttup;
1448         TupleDesc       toasttupDesc;
1449         struct varlena *result;
1450         struct varatt_external toast_pointer;
1451         int32           attrsize;
1452         int32           residx;
1453         int32           nextidx;
1454         int                     numchunks;
1455         int                     startchunk;
1456         int                     endchunk;
1457         int32           startoffset;
1458         int32           endoffset;
1459         int                     totalchunks;
1460         Pointer         chunk;
1461         bool            isnull;
1462         char       *chunkdata;
1463         int32           chunksize;
1464         int32           chcpystrt;
1465         int32           chcpyend;
1466
1467         Assert(VARATT_IS_EXTERNAL(attr));
1468
1469         /* Must copy to access aligned fields */
1470         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1471
1472         /*
1473          * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
1474          * we can't return a compressed datum which is meaningful to toast later
1475          */
1476         Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1477
1478         attrsize = toast_pointer.va_extsize;
1479         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1480
1481         if (sliceoffset >= attrsize)
1482         {
1483                 sliceoffset = 0;
1484                 length = 0;
1485         }
1486
1487         if (((sliceoffset + length) > attrsize) || length < 0)
1488                 length = attrsize - sliceoffset;
1489
1490         result = (struct varlena *) palloc(length + VARHDRSZ);
1491
1492         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1493                 SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
1494         else
1495                 SET_VARSIZE(result, length + VARHDRSZ);
1496
1497         if (length == 0)
1498                 return result;                  /* Can save a lot of work at this point! */
1499
1500         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1501         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1502         numchunks = (endchunk - startchunk) + 1;
1503
1504         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1505         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1506
1507         /*
1508          * Open the toast relation and its index
1509          */
1510         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1511         toasttupDesc = toastrel->rd_att;
1512         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1513
1514         /*
1515          * Setup a scan key to fetch from the index. This is either two keys or
1516          * three depending on the number of chunks.
1517          */
1518         ScanKeyInit(&toastkey[0],
1519                                 (AttrNumber) 1,
1520                                 BTEqualStrategyNumber, F_OIDEQ,
1521                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1522
1523         /*
1524          * Use equality condition for one chunk, a range condition otherwise:
1525          */
1526         if (numchunks == 1)
1527         {
1528                 ScanKeyInit(&toastkey[1],
1529                                         (AttrNumber) 2,
1530                                         BTEqualStrategyNumber, F_INT4EQ,
1531                                         Int32GetDatum(startchunk));
1532                 nscankeys = 2;
1533         }
1534         else
1535         {
1536                 ScanKeyInit(&toastkey[1],
1537                                         (AttrNumber) 2,
1538                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1539                                         Int32GetDatum(startchunk));
1540                 ScanKeyInit(&toastkey[2],
1541                                         (AttrNumber) 2,
1542                                         BTLessEqualStrategyNumber, F_INT4LE,
1543                                         Int32GetDatum(endchunk));
1544                 nscankeys = 3;
1545         }
1546
1547         /*
1548          * Read the chunks by index
1549          *
1550          * The index is on (valueid, chunkidx) so they will come in order
1551          */
1552         nextidx = startchunk;
1553         toastscan = index_beginscan(toastrel, toastidx,
1554                                                                 SnapshotToast, nscankeys, toastkey);
1555         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1556         {
1557                 /*
1558                  * Have a chunk, extract the sequence number and the data
1559                  */
1560                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1561                 Assert(!isnull);
1562                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1563                 Assert(!isnull);
1564                 if (!VARATT_IS_EXTENDED(chunk))
1565                 {
1566                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1567                         chunkdata = VARDATA(chunk);
1568                 }
1569                 else if (VARATT_IS_SHORT(chunk))
1570                 {
1571                         /* could happen due to heap_form_tuple doing its thing */
1572                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1573                         chunkdata = VARDATA_SHORT(chunk);
1574                 }
1575                 else
1576                 {
1577                         /* should never happen */
1578                         elog(ERROR, "found toasted toast chunk");
1579                         chunksize = 0;          /* keep compiler quiet */
1580                         chunkdata = NULL;
1581                 }
1582
1583                 /*
1584                  * Some checks on the data we've found
1585                  */
1586                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1587                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1588                                  residx, nextidx,
1589                                  toast_pointer.va_valueid);
1590                 if (residx < totalchunks - 1)
1591                 {
1592                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1593                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u when fetching slice",
1594                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1595                                          residx, totalchunks,
1596                                          toast_pointer.va_valueid);
1597                 }
1598                 else if (residx == totalchunks - 1)
1599                 {
1600                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1601                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u when fetching slice",
1602                                          chunksize,
1603                                          (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
1604                                          residx,
1605                                          toast_pointer.va_valueid);
1606                 }
1607                 else
1608                         elog(ERROR, "unexpected chunk number %d for toast value %u (out of range %d..%d)",
1609                                  residx,
1610                                  toast_pointer.va_valueid,
1611                                  0, totalchunks - 1);
1612
1613                 /*
1614                  * Copy the data into proper place in our result
1615                  */
1616                 chcpystrt = 0;
1617                 chcpyend = chunksize - 1;
1618                 if (residx == startchunk)
1619                         chcpystrt = startoffset;
1620                 if (residx == endchunk)
1621                         chcpyend = endoffset;
1622
1623                 memcpy(VARDATA(result) +
1624                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1625                            chunkdata + chcpystrt,
1626                            (chcpyend - chcpystrt) + 1);
1627
1628                 nextidx++;
1629         }
1630
1631         /*
1632          * Final checks that we successfully fetched the datum
1633          */
1634         if (nextidx != (endchunk + 1))
1635                 elog(ERROR, "missing chunk number %d for toast value %u",
1636                          nextidx,
1637                          toast_pointer.va_valueid);
1638
1639         /*
1640          * End scan and close relations
1641          */
1642         index_endscan(toastscan);
1643         index_close(toastidx, AccessShareLock);
1644         heap_close(toastrel, AccessShareLock);
1645
1646         return result;
1647 }