]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
c83aed2e39dd5e79ea7d53b39f56500d15903e46
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2010, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "utils/fmgroids.h"
39 #include "utils/pg_lzcompress.h"
40 #include "utils/rel.h"
41 #include "utils/typcache.h"
42 #include "utils/tqual.h"
43
44
45 #undef TOAST_DEBUG
46
47 /* Size of an EXTERNAL datum that contains a standard TOAST pointer */
48 #define TOAST_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_external))
49
50 /*
51  * Testing whether an externally-stored value is compressed now requires
52  * comparing extsize (the actual length of the external data) to rawsize
53  * (the original uncompressed datum's size).  The latter includes VARHDRSZ
54  * overhead, the former doesn't.  We never use compression unless it actually
55  * saves space, so we expect either equality or less-than.
56  */
57 #define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \
58         ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ)
59
60 /*
61  * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum
62  * into a local "struct varatt_external" toast pointer.  This should be
63  * just a memcpy, but some versions of gcc seem to produce broken code
64  * that assumes the datum contents are aligned.  Introducing an explicit
65  * intermediate "varattrib_1b_e *" variable seems to fix it.
66  */
67 #define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \
68 do { \
69         varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \
70         Assert(VARATT_IS_EXTERNAL(attre)); \
71         Assert(VARSIZE_EXTERNAL(attre) == sizeof(toast_pointer) + VARHDRSZ_EXTERNAL); \
72         memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
73 } while (0)
74
75
76 static void toast_delete_datum(Relation rel, Datum value);
77 static Datum toast_save_datum(Relation rel, Datum value, int options);
78 static struct varlena *toast_fetch_datum(struct varlena * attr);
79 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
80                                                 int32 sliceoffset, int32 length);
81
82
83 /* ----------
84  * heap_tuple_fetch_attr -
85  *
86  *      Public entry point to get back a toasted value from
87  *      external storage (possibly still in compressed format).
88  *
89  * This will return a datum that contains all the data internally, ie, not
90  * relying on external storage, but it can still be compressed or have a short
91  * header.
92  ----------
93  */
94 struct varlena *
95 heap_tuple_fetch_attr(struct varlena * attr)
96 {
97         struct varlena *result;
98
99         if (VARATT_IS_EXTERNAL(attr))
100         {
101                 /*
102                  * This is an external stored plain value
103                  */
104                 result = toast_fetch_datum(attr);
105         }
106         else
107         {
108                 /*
109                  * This is a plain value inside of the main tuple - why am I called?
110                  */
111                 result = attr;
112         }
113
114         return result;
115 }
116
117
118 /* ----------
119  * heap_tuple_untoast_attr -
120  *
121  *      Public entry point to get back a toasted value from compression
122  *      or external storage.
123  * ----------
124  */
125 struct varlena *
126 heap_tuple_untoast_attr(struct varlena * attr)
127 {
128         if (VARATT_IS_EXTERNAL(attr))
129         {
130                 /*
131                  * This is an externally stored datum --- fetch it back from there
132                  */
133                 attr = toast_fetch_datum(attr);
134                 /* If it's compressed, decompress it */
135                 if (VARATT_IS_COMPRESSED(attr))
136                 {
137                         PGLZ_Header *tmp = (PGLZ_Header *) attr;
138
139                         attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
140                         SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
141                         pglz_decompress(tmp, VARDATA(attr));
142                         pfree(tmp);
143                 }
144         }
145         else if (VARATT_IS_COMPRESSED(attr))
146         {
147                 /*
148                  * This is a compressed value inside of the main tuple
149                  */
150                 PGLZ_Header *tmp = (PGLZ_Header *) attr;
151
152                 attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
153                 SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
154                 pglz_decompress(tmp, VARDATA(attr));
155         }
156         else if (VARATT_IS_SHORT(attr))
157         {
158                 /*
159                  * This is a short-header varlena --- convert to 4-byte header format
160                  */
161                 Size            data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
162                 Size            new_size = data_size + VARHDRSZ;
163                 struct varlena *new_attr;
164
165                 new_attr = (struct varlena *) palloc(new_size);
166                 SET_VARSIZE(new_attr, new_size);
167                 memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
168                 attr = new_attr;
169         }
170
171         return attr;
172 }
173
174
175 /* ----------
176  * heap_tuple_untoast_attr_slice -
177  *
178  *              Public entry point to get back part of a toasted value
179  *              from compression or external storage.
180  * ----------
181  */
182 struct varlena *
183 heap_tuple_untoast_attr_slice(struct varlena * attr,
184                                                           int32 sliceoffset, int32 slicelength)
185 {
186         struct varlena *preslice;
187         struct varlena *result;
188         char       *attrdata;
189         int32           attrsize;
190
191         if (VARATT_IS_EXTERNAL(attr))
192         {
193                 struct varatt_external toast_pointer;
194
195                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
196
197                 /* fast path for non-compressed external datums */
198                 if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
199                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
200
201                 /* fetch it back (compressed marker will get set automatically) */
202                 preslice = toast_fetch_datum(attr);
203         }
204         else
205                 preslice = attr;
206
207         if (VARATT_IS_COMPRESSED(preslice))
208         {
209                 PGLZ_Header *tmp = (PGLZ_Header *) preslice;
210                 Size            size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
211
212                 preslice = (struct varlena *) palloc(size);
213                 SET_VARSIZE(preslice, size);
214                 pglz_decompress(tmp, VARDATA(preslice));
215
216                 if (tmp != (PGLZ_Header *) attr)
217                         pfree(tmp);
218         }
219
220         if (VARATT_IS_SHORT(preslice))
221         {
222                 attrdata = VARDATA_SHORT(preslice);
223                 attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
224         }
225         else
226         {
227                 attrdata = VARDATA(preslice);
228                 attrsize = VARSIZE(preslice) - VARHDRSZ;
229         }
230
231         /* slicing of datum for compressed cases and plain value */
232
233         if (sliceoffset >= attrsize)
234         {
235                 sliceoffset = 0;
236                 slicelength = 0;
237         }
238
239         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
240                 slicelength = attrsize - sliceoffset;
241
242         result = (struct varlena *) palloc(slicelength + VARHDRSZ);
243         SET_VARSIZE(result, slicelength + VARHDRSZ);
244
245         memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
246
247         if (preslice != attr)
248                 pfree(preslice);
249
250         return result;
251 }
252
253
254 /* ----------
255  * toast_raw_datum_size -
256  *
257  *      Return the raw (detoasted) size of a varlena datum
258  *      (including the VARHDRSZ header)
259  * ----------
260  */
261 Size
262 toast_raw_datum_size(Datum value)
263 {
264         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
265         Size            result;
266
267         if (VARATT_IS_EXTERNAL(attr))
268         {
269                 /* va_rawsize is the size of the original datum -- including header */
270                 struct varatt_external toast_pointer;
271
272                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
273                 result = toast_pointer.va_rawsize;
274         }
275         else if (VARATT_IS_COMPRESSED(attr))
276         {
277                 /* here, va_rawsize is just the payload size */
278                 result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
279         }
280         else if (VARATT_IS_SHORT(attr))
281         {
282                 /*
283                  * we have to normalize the header length to VARHDRSZ or else the
284                  * callers of this function will be confused.
285                  */
286                 result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
287         }
288         else
289         {
290                 /* plain untoasted datum */
291                 result = VARSIZE(attr);
292         }
293         return result;
294 }
295
296 /* ----------
297  * toast_datum_size
298  *
299  *      Return the physical storage size (possibly compressed) of a varlena datum
300  * ----------
301  */
302 Size
303 toast_datum_size(Datum value)
304 {
305         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
306         Size            result;
307
308         if (VARATT_IS_EXTERNAL(attr))
309         {
310                 /*
311                  * Attribute is stored externally - return the extsize whether
312                  * compressed or not.  We do not count the size of the toast pointer
313                  * ... should we?
314                  */
315                 struct varatt_external toast_pointer;
316
317                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
318                 result = toast_pointer.va_extsize;
319         }
320         else if (VARATT_IS_SHORT(attr))
321         {
322                 result = VARSIZE_SHORT(attr);
323         }
324         else
325         {
326                 /*
327                  * Attribute is stored inline either compressed or not, just calculate
328                  * the size of the datum in either case.
329                  */
330                 result = VARSIZE(attr);
331         }
332         return result;
333 }
334
335
336 /* ----------
337  * toast_delete -
338  *
339  *      Cascaded delete toast-entries on DELETE
340  * ----------
341  */
342 void
343 toast_delete(Relation rel, HeapTuple oldtup)
344 {
345         TupleDesc       tupleDesc;
346         Form_pg_attribute *att;
347         int                     numAttrs;
348         int                     i;
349         Datum           toast_values[MaxHeapAttributeNumber];
350         bool            toast_isnull[MaxHeapAttributeNumber];
351
352         /*
353          * We should only ever be called for tuples of plain relations ---
354          * recursing on a toast rel is bad news.
355          */
356         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
357
358         /*
359          * Get the tuple descriptor and break down the tuple into fields.
360          *
361          * NOTE: it's debatable whether to use heap_deform_tuple() here or just
362          * heap_getattr() only the varlena columns.  The latter could win if there
363          * are few varlena columns and many non-varlena ones. However,
364          * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
365          * O(N^2) if there are many varlena columns, so it seems better to err on
366          * the side of linear cost.  (We won't even be here unless there's at
367          * least one varlena column, by the way.)
368          */
369         tupleDesc = rel->rd_att;
370         att = tupleDesc->attrs;
371         numAttrs = tupleDesc->natts;
372
373         Assert(numAttrs <= MaxHeapAttributeNumber);
374         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
375
376         /*
377          * Check for external stored attributes and delete them from the secondary
378          * relation.
379          */
380         for (i = 0; i < numAttrs; i++)
381         {
382                 if (att[i]->attlen == -1)
383                 {
384                         Datum           value = toast_values[i];
385
386                         if (!toast_isnull[i] && VARATT_IS_EXTERNAL(PointerGetDatum(value)))
387                                 toast_delete_datum(rel, value);
388                 }
389         }
390 }
391
392
393 /* ----------
394  * toast_insert_or_update -
395  *
396  *      Delete no-longer-used toast-entries and create new ones to
397  *      make the new tuple fit on INSERT or UPDATE
398  *
399  * Inputs:
400  *      newtup: the candidate new tuple to be inserted
401  *      oldtup: the old row version for UPDATE, or NULL for INSERT
402  *      options: options to be passed to heap_insert() for toast rows
403  * Result:
404  *      either newtup if no toasting is needed, or a palloc'd modified tuple
405  *      that is what should actually get stored
406  *
407  * NOTE: neither newtup nor oldtup will be modified.  This is a change
408  * from the pre-8.1 API of this routine.
409  * ----------
410  */
411 HeapTuple
412 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
413                                            int options)
414 {
415         HeapTuple       result_tuple;
416         TupleDesc       tupleDesc;
417         Form_pg_attribute *att;
418         int                     numAttrs;
419         int                     i;
420
421         bool            need_change = false;
422         bool            need_free = false;
423         bool            need_delold = false;
424         bool            has_nulls = false;
425
426         Size            maxDataLen;
427         Size            hoff;
428
429         char            toast_action[MaxHeapAttributeNumber];
430         bool            toast_isnull[MaxHeapAttributeNumber];
431         bool            toast_oldisnull[MaxHeapAttributeNumber];
432         Datum           toast_values[MaxHeapAttributeNumber];
433         Datum           toast_oldvalues[MaxHeapAttributeNumber];
434         int32           toast_sizes[MaxHeapAttributeNumber];
435         bool            toast_free[MaxHeapAttributeNumber];
436         bool            toast_delold[MaxHeapAttributeNumber];
437
438         /*
439          * We should only ever be called for tuples of plain relations ---
440          * recursing on a toast rel is bad news.
441          */
442         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
443
444         /*
445          * Get the tuple descriptor and break down the tuple(s) into fields.
446          */
447         tupleDesc = rel->rd_att;
448         att = tupleDesc->attrs;
449         numAttrs = tupleDesc->natts;
450
451         Assert(numAttrs <= MaxHeapAttributeNumber);
452         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
453         if (oldtup != NULL)
454                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
455
456         /* ----------
457          * Then collect information about the values given
458          *
459          * NOTE: toast_action[i] can have these values:
460          *              ' '             default handling
461          *              'p'             already processed --- don't touch it
462          *              'x'             incompressible, but OK to move off
463          *
464          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
465          *              toast_action[i] different from 'p'.
466          * ----------
467          */
468         memset(toast_action, ' ', numAttrs * sizeof(char));
469         memset(toast_free, 0, numAttrs * sizeof(bool));
470         memset(toast_delold, 0, numAttrs * sizeof(bool));
471
472         for (i = 0; i < numAttrs; i++)
473         {
474                 struct varlena *old_value;
475                 struct varlena *new_value;
476
477                 if (oldtup != NULL)
478                 {
479                         /*
480                          * For UPDATE get the old and new values of this attribute
481                          */
482                         old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
483                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
484
485                         /*
486                          * If the old value is an external stored one, check if it has
487                          * changed so we have to delete it later.
488                          */
489                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
490                                 VARATT_IS_EXTERNAL(old_value))
491                         {
492                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
493                                         memcmp((char *) old_value, (char *) new_value,
494                                                    VARSIZE_EXTERNAL(old_value)) != 0)
495                                 {
496                                         /*
497                                          * The old external stored value isn't needed any more
498                                          * after the update
499                                          */
500                                         toast_delold[i] = true;
501                                         need_delold = true;
502                                 }
503                                 else
504                                 {
505                                         /*
506                                          * This attribute isn't changed by this update so we reuse
507                                          * the original reference to the old value in the new
508                                          * tuple.
509                                          */
510                                         toast_action[i] = 'p';
511                                         continue;
512                                 }
513                         }
514                 }
515                 else
516                 {
517                         /*
518                          * For INSERT simply get the new value
519                          */
520                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
521                 }
522
523                 /*
524                  * Handle NULL attributes
525                  */
526                 if (toast_isnull[i])
527                 {
528                         toast_action[i] = 'p';
529                         has_nulls = true;
530                         continue;
531                 }
532
533                 /*
534                  * Now look at varlena attributes
535                  */
536                 if (att[i]->attlen == -1)
537                 {
538                         /*
539                          * If the table's attribute says PLAIN always, force it so.
540                          */
541                         if (att[i]->attstorage == 'p')
542                                 toast_action[i] = 'p';
543
544                         /*
545                          * We took care of UPDATE above, so any external value we find
546                          * still in the tuple must be someone else's we cannot reuse.
547                          * Fetch it back (without decompression, unless we are forcing
548                          * PLAIN storage).      If necessary, we'll push it out as a new
549                          * external value below.
550                          */
551                         if (VARATT_IS_EXTERNAL(new_value))
552                         {
553                                 if (att[i]->attstorage == 'p')
554                                         new_value = heap_tuple_untoast_attr(new_value);
555                                 else
556                                         new_value = heap_tuple_fetch_attr(new_value);
557                                 toast_values[i] = PointerGetDatum(new_value);
558                                 toast_free[i] = true;
559                                 need_change = true;
560                                 need_free = true;
561                         }
562
563                         /*
564                          * Remember the size of this attribute
565                          */
566                         toast_sizes[i] = VARSIZE_ANY(new_value);
567                 }
568                 else
569                 {
570                         /*
571                          * Not a varlena attribute, plain storage always
572                          */
573                         toast_action[i] = 'p';
574                 }
575         }
576
577         /* ----------
578          * Compress and/or save external until data fits into target length
579          *
580          *      1: Inline compress attributes with attstorage 'x', and store very
581          *         large attributes with attstorage 'x' or 'e' external immediately
582          *      2: Store attributes with attstorage 'x' or 'e' external
583          *      3: Inline compress attributes with attstorage 'm'
584          *      4: Store attributes with attstorage 'm' external
585          * ----------
586          */
587
588         /* compute header overhead --- this should match heap_form_tuple() */
589         hoff = offsetof(HeapTupleHeaderData, t_bits);
590         if (has_nulls)
591                 hoff += BITMAPLEN(numAttrs);
592         if (newtup->t_data->t_infomask & HEAP_HASOID)
593                 hoff += sizeof(Oid);
594         hoff = MAXALIGN(hoff);
595         Assert(hoff == newtup->t_data->t_hoff);
596         /* now convert to a limit on the tuple data size */
597         maxDataLen = TOAST_TUPLE_TARGET - hoff;
598
599         /*
600          * Look for attributes with attstorage 'x' to compress.  Also find large
601          * attributes with attstorage 'x' or 'e', and store them external.
602          */
603         while (heap_compute_data_size(tupleDesc,
604                                                                   toast_values, toast_isnull) > maxDataLen)
605         {
606                 int                     biggest_attno = -1;
607                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
608                 Datum           old_value;
609                 Datum           new_value;
610
611                 /*
612                  * Search for the biggest yet unprocessed internal attribute
613                  */
614                 for (i = 0; i < numAttrs; i++)
615                 {
616                         if (toast_action[i] != ' ')
617                                 continue;
618                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
619                                 continue;               /* can't happen, toast_action would be 'p' */
620                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
621                                 continue;
622                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
623                                 continue;
624                         if (toast_sizes[i] > biggest_size)
625                         {
626                                 biggest_attno = i;
627                                 biggest_size = toast_sizes[i];
628                         }
629                 }
630
631                 if (biggest_attno < 0)
632                         break;
633
634                 /*
635                  * Attempt to compress it inline, if it has attstorage 'x'
636                  */
637                 i = biggest_attno;
638                 if (att[i]->attstorage == 'x')
639                 {
640                         old_value = toast_values[i];
641                         new_value = toast_compress_datum(old_value);
642
643                         if (DatumGetPointer(new_value) != NULL)
644                         {
645                                 /* successful compression */
646                                 if (toast_free[i])
647                                         pfree(DatumGetPointer(old_value));
648                                 toast_values[i] = new_value;
649                                 toast_free[i] = true;
650                                 toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
651                                 need_change = true;
652                                 need_free = true;
653                         }
654                         else
655                         {
656                                 /* incompressible, ignore on subsequent compression passes */
657                                 toast_action[i] = 'x';
658                         }
659                 }
660                 else
661                 {
662                         /* has attstorage 'e', ignore on subsequent compression passes */
663                         toast_action[i] = 'x';
664                 }
665
666                 /*
667                  * If this value is by itself more than maxDataLen (after compression
668                  * if any), push it out to the toast table immediately, if possible.
669                  * This avoids uselessly compressing other fields in the common case
670                  * where we have one long field and several short ones.
671                  *
672                  * XXX maybe the threshold should be less than maxDataLen?
673                  */
674                 if (toast_sizes[i] > maxDataLen &&
675                         rel->rd_rel->reltoastrelid != InvalidOid)
676                 {
677                         old_value = toast_values[i];
678                         toast_action[i] = 'p';
679                         toast_values[i] = toast_save_datum(rel, toast_values[i], options);
680                         if (toast_free[i])
681                                 pfree(DatumGetPointer(old_value));
682                         toast_free[i] = true;
683                         need_change = true;
684                         need_free = true;
685                 }
686         }
687
688         /*
689          * Second we look for attributes of attstorage 'x' or 'e' that are still
690          * inline.      But skip this if there's no toast table to push them to.
691          */
692         while (heap_compute_data_size(tupleDesc,
693                                                                   toast_values, toast_isnull) > maxDataLen &&
694                    rel->rd_rel->reltoastrelid != InvalidOid)
695         {
696                 int                     biggest_attno = -1;
697                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
698                 Datum           old_value;
699
700                 /*------
701                  * Search for the biggest yet inlined attribute with
702                  * attstorage equals 'x' or 'e'
703                  *------
704                  */
705                 for (i = 0; i < numAttrs; i++)
706                 {
707                         if (toast_action[i] == 'p')
708                                 continue;
709                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
710                                 continue;               /* can't happen, toast_action would be 'p' */
711                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
712                                 continue;
713                         if (toast_sizes[i] > biggest_size)
714                         {
715                                 biggest_attno = i;
716                                 biggest_size = toast_sizes[i];
717                         }
718                 }
719
720                 if (biggest_attno < 0)
721                         break;
722
723                 /*
724                  * Store this external
725                  */
726                 i = biggest_attno;
727                 old_value = toast_values[i];
728                 toast_action[i] = 'p';
729                 toast_values[i] = toast_save_datum(rel, toast_values[i], options);
730                 if (toast_free[i])
731                         pfree(DatumGetPointer(old_value));
732                 toast_free[i] = true;
733
734                 need_change = true;
735                 need_free = true;
736         }
737
738         /*
739          * Round 3 - this time we take attributes with storage 'm' into
740          * compression
741          */
742         while (heap_compute_data_size(tupleDesc,
743                                                                   toast_values, toast_isnull) > maxDataLen)
744         {
745                 int                     biggest_attno = -1;
746                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
747                 Datum           old_value;
748                 Datum           new_value;
749
750                 /*
751                  * Search for the biggest yet uncompressed internal attribute
752                  */
753                 for (i = 0; i < numAttrs; i++)
754                 {
755                         if (toast_action[i] != ' ')
756                                 continue;
757                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
758                                 continue;               /* can't happen, toast_action would be 'p' */
759                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
760                                 continue;
761                         if (att[i]->attstorage != 'm')
762                                 continue;
763                         if (toast_sizes[i] > biggest_size)
764                         {
765                                 biggest_attno = i;
766                                 biggest_size = toast_sizes[i];
767                         }
768                 }
769
770                 if (biggest_attno < 0)
771                         break;
772
773                 /*
774                  * Attempt to compress it inline
775                  */
776                 i = biggest_attno;
777                 old_value = toast_values[i];
778                 new_value = toast_compress_datum(old_value);
779
780                 if (DatumGetPointer(new_value) != NULL)
781                 {
782                         /* successful compression */
783                         if (toast_free[i])
784                                 pfree(DatumGetPointer(old_value));
785                         toast_values[i] = new_value;
786                         toast_free[i] = true;
787                         toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
788                         need_change = true;
789                         need_free = true;
790                 }
791                 else
792                 {
793                         /* incompressible, ignore on subsequent compression passes */
794                         toast_action[i] = 'x';
795                 }
796         }
797
798         /*
799          * Finally we store attributes of type 'm' externally.  At this point we
800          * increase the target tuple size, so that 'm' attributes aren't stored
801          * externally unless really necessary.
802          */
803         maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
804
805         while (heap_compute_data_size(tupleDesc,
806                                                                   toast_values, toast_isnull) > maxDataLen &&
807                    rel->rd_rel->reltoastrelid != InvalidOid)
808         {
809                 int                     biggest_attno = -1;
810                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
811                 Datum           old_value;
812
813                 /*--------
814                  * Search for the biggest yet inlined attribute with
815                  * attstorage = 'm'
816                  *--------
817                  */
818                 for (i = 0; i < numAttrs; i++)
819                 {
820                         if (toast_action[i] == 'p')
821                                 continue;
822                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
823                                 continue;               /* can't happen, toast_action would be 'p' */
824                         if (att[i]->attstorage != 'm')
825                                 continue;
826                         if (toast_sizes[i] > biggest_size)
827                         {
828                                 biggest_attno = i;
829                                 biggest_size = toast_sizes[i];
830                         }
831                 }
832
833                 if (biggest_attno < 0)
834                         break;
835
836                 /*
837                  * Store this external
838                  */
839                 i = biggest_attno;
840                 old_value = toast_values[i];
841                 toast_action[i] = 'p';
842                 toast_values[i] = toast_save_datum(rel, toast_values[i], options);
843                 if (toast_free[i])
844                         pfree(DatumGetPointer(old_value));
845                 toast_free[i] = true;
846
847                 need_change = true;
848                 need_free = true;
849         }
850
851         /*
852          * In the case we toasted any values, we need to build a new heap tuple
853          * with the changed values.
854          */
855         if (need_change)
856         {
857                 HeapTupleHeader olddata = newtup->t_data;
858                 HeapTupleHeader new_data;
859                 int32           new_len;
860                 int32           new_data_len;
861
862                 /*
863                  * Calculate the new size of the tuple.  Header size should not
864                  * change, but data size might.
865                  */
866                 new_len = offsetof(HeapTupleHeaderData, t_bits);
867                 if (has_nulls)
868                         new_len += BITMAPLEN(numAttrs);
869                 if (olddata->t_infomask & HEAP_HASOID)
870                         new_len += sizeof(Oid);
871                 new_len = MAXALIGN(new_len);
872                 Assert(new_len == olddata->t_hoff);
873                 new_data_len = heap_compute_data_size(tupleDesc,
874                                                                                           toast_values, toast_isnull);
875                 new_len += new_data_len;
876
877                 /*
878                  * Allocate and zero the space needed, and fill HeapTupleData fields.
879                  */
880                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
881                 result_tuple->t_len = new_len;
882                 result_tuple->t_self = newtup->t_self;
883                 result_tuple->t_tableOid = newtup->t_tableOid;
884                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
885                 result_tuple->t_data = new_data;
886
887                 /*
888                  * Put the existing tuple header and the changed values into place
889                  */
890                 memcpy(new_data, olddata, olddata->t_hoff);
891
892                 heap_fill_tuple(tupleDesc,
893                                                 toast_values,
894                                                 toast_isnull,
895                                                 (char *) new_data + olddata->t_hoff,
896                                                 new_data_len,
897                                                 &(new_data->t_infomask),
898                                                 has_nulls ? new_data->t_bits : NULL);
899         }
900         else
901                 result_tuple = newtup;
902
903         /*
904          * Free allocated temp values
905          */
906         if (need_free)
907                 for (i = 0; i < numAttrs; i++)
908                         if (toast_free[i])
909                                 pfree(DatumGetPointer(toast_values[i]));
910
911         /*
912          * Delete external values from the old tuple
913          */
914         if (need_delold)
915                 for (i = 0; i < numAttrs; i++)
916                         if (toast_delold[i])
917                                 toast_delete_datum(rel, toast_oldvalues[i]);
918
919         return result_tuple;
920 }
921
922
923 /* ----------
924  * toast_flatten_tuple_attribute -
925  *
926  *      If a Datum is of composite type, "flatten" it to contain no toasted fields.
927  *      This must be invoked on any potentially-composite field that is to be
928  *      inserted into a tuple.  Doing this preserves the invariant that toasting
929  *      goes only one level deep in a tuple.
930  *
931  *      Note that flattening does not mean expansion of short-header varlenas,
932  *      so in one sense toasting is allowed within composite datums.
933  * ----------
934  */
935 Datum
936 toast_flatten_tuple_attribute(Datum value,
937                                                           Oid typeId, int32 typeMod)
938 {
939         TupleDesc       tupleDesc;
940         HeapTupleHeader olddata;
941         HeapTupleHeader new_data;
942         int32           new_len;
943         int32           new_data_len;
944         HeapTupleData tmptup;
945         Form_pg_attribute *att;
946         int                     numAttrs;
947         int                     i;
948         bool            need_change = false;
949         bool            has_nulls = false;
950         Datum           toast_values[MaxTupleAttributeNumber];
951         bool            toast_isnull[MaxTupleAttributeNumber];
952         bool            toast_free[MaxTupleAttributeNumber];
953
954         /*
955          * See if it's a composite type, and get the tupdesc if so.
956          */
957         tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
958         if (tupleDesc == NULL)
959                 return value;                   /* not a composite type */
960
961         att = tupleDesc->attrs;
962         numAttrs = tupleDesc->natts;
963
964         /*
965          * Break down the tuple into fields.
966          */
967         olddata = DatumGetHeapTupleHeader(value);
968         Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
969         Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
970         /* Build a temporary HeapTuple control structure */
971         tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
972         ItemPointerSetInvalid(&(tmptup.t_self));
973         tmptup.t_tableOid = InvalidOid;
974         tmptup.t_data = olddata;
975
976         Assert(numAttrs <= MaxTupleAttributeNumber);
977         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
978
979         memset(toast_free, 0, numAttrs * sizeof(bool));
980
981         for (i = 0; i < numAttrs; i++)
982         {
983                 /*
984                  * Look at non-null varlena attributes
985                  */
986                 if (toast_isnull[i])
987                         has_nulls = true;
988                 else if (att[i]->attlen == -1)
989                 {
990                         struct varlena *new_value;
991
992                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
993                         if (VARATT_IS_EXTERNAL(new_value) ||
994                                 VARATT_IS_COMPRESSED(new_value))
995                         {
996                                 new_value = heap_tuple_untoast_attr(new_value);
997                                 toast_values[i] = PointerGetDatum(new_value);
998                                 toast_free[i] = true;
999                                 need_change = true;
1000                         }
1001                 }
1002         }
1003
1004         /*
1005          * If nothing to untoast, just return the original tuple.
1006          */
1007         if (!need_change)
1008         {
1009                 ReleaseTupleDesc(tupleDesc);
1010                 return value;
1011         }
1012
1013         /*
1014          * Calculate the new size of the tuple.  Header size should not change,
1015          * but data size might.
1016          */
1017         new_len = offsetof(HeapTupleHeaderData, t_bits);
1018         if (has_nulls)
1019                 new_len += BITMAPLEN(numAttrs);
1020         if (olddata->t_infomask & HEAP_HASOID)
1021                 new_len += sizeof(Oid);
1022         new_len = MAXALIGN(new_len);
1023         Assert(new_len == olddata->t_hoff);
1024         new_data_len = heap_compute_data_size(tupleDesc,
1025                                                                                   toast_values, toast_isnull);
1026         new_len += new_data_len;
1027
1028         new_data = (HeapTupleHeader) palloc0(new_len);
1029
1030         /*
1031          * Put the tuple header and the changed values into place
1032          */
1033         memcpy(new_data, olddata, olddata->t_hoff);
1034
1035         HeapTupleHeaderSetDatumLength(new_data, new_len);
1036
1037         heap_fill_tuple(tupleDesc,
1038                                         toast_values,
1039                                         toast_isnull,
1040                                         (char *) new_data + olddata->t_hoff,
1041                                         new_data_len,
1042                                         &(new_data->t_infomask),
1043                                         has_nulls ? new_data->t_bits : NULL);
1044
1045         /*
1046          * Free allocated temp values
1047          */
1048         for (i = 0; i < numAttrs; i++)
1049                 if (toast_free[i])
1050                         pfree(DatumGetPointer(toast_values[i]));
1051         ReleaseTupleDesc(tupleDesc);
1052
1053         return PointerGetDatum(new_data);
1054 }
1055
1056
1057 /* ----------
1058  * toast_compress_datum -
1059  *
1060  *      Create a compressed version of a varlena datum
1061  *
1062  *      If we fail (ie, compressed result is actually bigger than original)
1063  *      then return NULL.  We must not use compressed data if it'd expand
1064  *      the tuple!
1065  *
1066  *      We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1067  *      copying them.  But we can't handle external or compressed datums.
1068  * ----------
1069  */
1070 Datum
1071 toast_compress_datum(Datum value)
1072 {
1073         struct varlena *tmp;
1074         int32           valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1075
1076         Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1077         Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1078
1079         /*
1080          * No point in wasting a palloc cycle if value size is out of the allowed
1081          * range for compression
1082          */
1083         if (valsize < PGLZ_strategy_default->min_input_size ||
1084                 valsize > PGLZ_strategy_default->max_input_size)
1085                 return PointerGetDatum(NULL);
1086
1087         tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
1088
1089         /*
1090          * We recheck the actual size even if pglz_compress() reports success,
1091          * because it might be satisfied with having saved as little as one byte
1092          * in the compressed data --- which could turn into a net loss once you
1093          * consider header and alignment padding.  Worst case, the compressed
1094          * format might require three padding bytes (plus header, which is
1095          * included in VARSIZE(tmp)), whereas the uncompressed format would take
1096          * only one header byte and no padding if the value is short enough.  So
1097          * we insist on a savings of more than 2 bytes to ensure we have a gain.
1098          */
1099         if (pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
1100                                           (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
1101                 VARSIZE(tmp) < valsize - 2)
1102         {
1103                 /* successful compression */
1104                 return PointerGetDatum(tmp);
1105         }
1106         else
1107         {
1108                 /* incompressible data */
1109                 pfree(tmp);
1110                 return PointerGetDatum(NULL);
1111         }
1112 }
1113
1114
1115 /* ----------
1116  * toast_save_datum -
1117  *
1118  *      Save one single datum into the secondary relation and return
1119  *      a Datum reference for it.
1120  * ----------
1121  */
1122 static Datum
1123 toast_save_datum(Relation rel, Datum value, int options)
1124 {
1125         Relation        toastrel;
1126         Relation        toastidx;
1127         HeapTuple       toasttup;
1128         TupleDesc       toasttupDesc;
1129         Datum           t_values[3];
1130         bool            t_isnull[3];
1131         CommandId       mycid = GetCurrentCommandId(true);
1132         struct varlena *result;
1133         struct varatt_external toast_pointer;
1134         struct
1135         {
1136                 struct varlena hdr;
1137                 char            data[TOAST_MAX_CHUNK_SIZE]; /* make struct big enough */
1138                 int32           align_it;       /* ensure struct is aligned well enough */
1139         }                       chunk_data;
1140         int32           chunk_size;
1141         int32           chunk_seq = 0;
1142         char       *data_p;
1143         int32           data_todo;
1144         Pointer         dval = DatumGetPointer(value);
1145
1146         /*
1147          * Open the toast relation and its index.  We can use the index to check
1148          * uniqueness of the OID we assign to the toasted item, even though it has
1149          * additional columns besides OID.
1150          */
1151         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1152         toasttupDesc = toastrel->rd_att;
1153         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1154
1155         /*
1156          * Get the data pointer and length, and compute va_rawsize and va_extsize.
1157          *
1158          * va_rawsize is the size of the equivalent fully uncompressed datum, so
1159          * we have to adjust for short headers.
1160          *
1161          * va_extsize is the actual size of the data payload in the toast records.
1162          */
1163         if (VARATT_IS_SHORT(dval))
1164         {
1165                 data_p = VARDATA_SHORT(dval);
1166                 data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1167                 toast_pointer.va_rawsize = data_todo + VARHDRSZ;                /* as if not short */
1168                 toast_pointer.va_extsize = data_todo;
1169         }
1170         else if (VARATT_IS_COMPRESSED(dval))
1171         {
1172                 data_p = VARDATA(dval);
1173                 data_todo = VARSIZE(dval) - VARHDRSZ;
1174                 /* rawsize in a compressed datum is just the size of the payload */
1175                 toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1176                 toast_pointer.va_extsize = data_todo;
1177                 /* Assert that the numbers look like it's compressed */
1178                 Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1179         }
1180         else
1181         {
1182                 data_p = VARDATA(dval);
1183                 data_todo = VARSIZE(dval) - VARHDRSZ;
1184                 toast_pointer.va_rawsize = VARSIZE(dval);
1185                 toast_pointer.va_extsize = data_todo;
1186         }
1187
1188         /*
1189          * Insert the correct table OID into the result TOAST pointer.
1190          *
1191          * Normally this is the actual OID of the target toast table, but during
1192          * table-rewriting operations such as CLUSTER, we have to insert the OID
1193          * of the table's real permanent toast table instead.  rd_toastoid is set
1194          * if we have to substitute such an OID.
1195          */
1196         if (OidIsValid(rel->rd_toastoid))
1197                 toast_pointer.va_toastrelid = rel->rd_toastoid;
1198         else
1199                 toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1200
1201         /*
1202          * Choose an unused OID within the toast table for this toast value.
1203          */
1204         toast_pointer.va_valueid = GetNewOidWithIndex(toastrel,
1205                                                                                                   RelationGetRelid(toastidx),
1206                                                                                                   (AttrNumber) 1);
1207
1208         /*
1209          * Initialize constant parts of the tuple data
1210          */
1211         t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1212         t_values[2] = PointerGetDatum(&chunk_data);
1213         t_isnull[0] = false;
1214         t_isnull[1] = false;
1215         t_isnull[2] = false;
1216
1217         /*
1218          * Split up the item into chunks
1219          */
1220         while (data_todo > 0)
1221         {
1222                 /*
1223                  * Calculate the size of this chunk
1224                  */
1225                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1226
1227                 /*
1228                  * Build a tuple and store it
1229                  */
1230                 t_values[1] = Int32GetDatum(chunk_seq++);
1231                 SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1232                 memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1233                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1234
1235                 heap_insert(toastrel, toasttup, mycid, options, NULL);
1236
1237                 /*
1238                  * Create the index entry.      We cheat a little here by not using
1239                  * FormIndexDatum: this relies on the knowledge that the index columns
1240                  * are the same as the initial columns of the table.
1241                  *
1242                  * Note also that there had better not be any user-created index on
1243                  * the TOAST table, since we don't bother to update anything else.
1244                  */
1245                 index_insert(toastidx, t_values, t_isnull,
1246                                          &(toasttup->t_self),
1247                                          toastrel,
1248                                          toastidx->rd_index->indisunique ?
1249                                          UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
1250
1251                 /*
1252                  * Free memory
1253                  */
1254                 heap_freetuple(toasttup);
1255
1256                 /*
1257                  * Move on to next chunk
1258                  */
1259                 data_todo -= chunk_size;
1260                 data_p += chunk_size;
1261         }
1262
1263         /*
1264          * Done - close toast relation
1265          */
1266         index_close(toastidx, RowExclusiveLock);
1267         heap_close(toastrel, RowExclusiveLock);
1268
1269         /*
1270          * Create the TOAST pointer value that we'll return
1271          */
1272         result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1273         SET_VARSIZE_EXTERNAL(result, TOAST_POINTER_SIZE);
1274         memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1275
1276         return PointerGetDatum(result);
1277 }
1278
1279
1280 /* ----------
1281  * toast_delete_datum -
1282  *
1283  *      Delete a single external stored value.
1284  * ----------
1285  */
1286 static void
1287 toast_delete_datum(Relation rel, Datum value)
1288 {
1289         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1290         struct varatt_external toast_pointer;
1291         Relation        toastrel;
1292         Relation        toastidx;
1293         ScanKeyData toastkey;
1294         SysScanDesc toastscan;
1295         HeapTuple       toasttup;
1296
1297         if (!VARATT_IS_EXTERNAL(attr))
1298                 return;
1299
1300         /* Must copy to access aligned fields */
1301         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1302
1303         /*
1304          * Open the toast relation and its index
1305          */
1306         toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1307         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1308
1309         /*
1310          * Setup a scan key to find chunks with matching va_valueid
1311          */
1312         ScanKeyInit(&toastkey,
1313                                 (AttrNumber) 1,
1314                                 BTEqualStrategyNumber, F_OIDEQ,
1315                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1316
1317         /*
1318          * Find all the chunks.  (We don't actually care whether we see them in
1319          * sequence or not, but since we've already locked the index we might as
1320          * well use systable_beginscan_ordered.)
1321          */
1322         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1323                                                                                    SnapshotToast, 1, &toastkey);
1324         while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1325         {
1326                 /*
1327                  * Have a chunk, delete it
1328                  */
1329                 simple_heap_delete(toastrel, &toasttup->t_self);
1330         }
1331
1332         /*
1333          * End scan and close relations
1334          */
1335         systable_endscan_ordered(toastscan);
1336         index_close(toastidx, RowExclusiveLock);
1337         heap_close(toastrel, RowExclusiveLock);
1338 }
1339
1340
1341 /* ----------
1342  * toast_fetch_datum -
1343  *
1344  *      Reconstruct an in memory Datum from the chunks saved
1345  *      in the toast relation
1346  * ----------
1347  */
1348 static struct varlena *
1349 toast_fetch_datum(struct varlena * attr)
1350 {
1351         Relation        toastrel;
1352         Relation        toastidx;
1353         ScanKeyData toastkey;
1354         SysScanDesc toastscan;
1355         HeapTuple       ttup;
1356         TupleDesc       toasttupDesc;
1357         struct varlena *result;
1358         struct varatt_external toast_pointer;
1359         int32           ressize;
1360         int32           residx,
1361                                 nextidx;
1362         int32           numchunks;
1363         Pointer         chunk;
1364         bool            isnull;
1365         char       *chunkdata;
1366         int32           chunksize;
1367
1368         /* Must copy to access aligned fields */
1369         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1370
1371         ressize = toast_pointer.va_extsize;
1372         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1373
1374         result = (struct varlena *) palloc(ressize + VARHDRSZ);
1375
1376         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1377                 SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1378         else
1379                 SET_VARSIZE(result, ressize + VARHDRSZ);
1380
1381         /*
1382          * Open the toast relation and its index
1383          */
1384         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1385         toasttupDesc = toastrel->rd_att;
1386         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1387
1388         /*
1389          * Setup a scan key to fetch from the index by va_valueid
1390          */
1391         ScanKeyInit(&toastkey,
1392                                 (AttrNumber) 1,
1393                                 BTEqualStrategyNumber, F_OIDEQ,
1394                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1395
1396         /*
1397          * Read the chunks by index
1398          *
1399          * Note that because the index is actually on (valueid, chunkidx) we will
1400          * see the chunks in chunkidx order, even though we didn't explicitly ask
1401          * for it.
1402          */
1403         nextidx = 0;
1404
1405         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1406                                                                                    SnapshotToast, 1, &toastkey);
1407         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1408         {
1409                 /*
1410                  * Have a chunk, extract the sequence number and the data
1411                  */
1412                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1413                 Assert(!isnull);
1414                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1415                 Assert(!isnull);
1416                 if (!VARATT_IS_EXTENDED(chunk))
1417                 {
1418                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1419                         chunkdata = VARDATA(chunk);
1420                 }
1421                 else if (VARATT_IS_SHORT(chunk))
1422                 {
1423                         /* could happen due to heap_form_tuple doing its thing */
1424                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1425                         chunkdata = VARDATA_SHORT(chunk);
1426                 }
1427                 else
1428                 {
1429                         /* should never happen */
1430                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1431                                  toast_pointer.va_valueid,
1432                                  RelationGetRelationName(toastrel));
1433                         chunksize = 0;          /* keep compiler quiet */
1434                         chunkdata = NULL;
1435                 }
1436
1437                 /*
1438                  * Some checks on the data we've found
1439                  */
1440                 if (residx != nextidx)
1441                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1442                                  residx, nextidx,
1443                                  toast_pointer.va_valueid,
1444                                  RelationGetRelationName(toastrel));
1445                 if (residx < numchunks - 1)
1446                 {
1447                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1448                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1449                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1450                                          residx, numchunks,
1451                                          toast_pointer.va_valueid,
1452                                          RelationGetRelationName(toastrel));
1453                 }
1454                 else if (residx == numchunks - 1)
1455                 {
1456                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1457                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
1458                                          chunksize,
1459                                          (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1460                                          residx,
1461                                          toast_pointer.va_valueid,
1462                                          RelationGetRelationName(toastrel));
1463                 }
1464                 else
1465                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1466                                  residx,
1467                                  0, numchunks - 1,
1468                                  toast_pointer.va_valueid,
1469                                  RelationGetRelationName(toastrel));
1470
1471                 /*
1472                  * Copy the data into proper place in our result
1473                  */
1474                 memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1475                            chunkdata,
1476                            chunksize);
1477
1478                 nextidx++;
1479         }
1480
1481         /*
1482          * Final checks that we successfully fetched the datum
1483          */
1484         if (nextidx != numchunks)
1485                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
1486                          nextidx,
1487                          toast_pointer.va_valueid,
1488                          RelationGetRelationName(toastrel));
1489
1490         /*
1491          * End scan and close relations
1492          */
1493         systable_endscan_ordered(toastscan);
1494         index_close(toastidx, AccessShareLock);
1495         heap_close(toastrel, AccessShareLock);
1496
1497         return result;
1498 }
1499
1500 /* ----------
1501  * toast_fetch_datum_slice -
1502  *
1503  *      Reconstruct a segment of a Datum from the chunks saved
1504  *      in the toast relation
1505  * ----------
1506  */
1507 static struct varlena *
1508 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1509 {
1510         Relation        toastrel;
1511         Relation        toastidx;
1512         ScanKeyData toastkey[3];
1513         int                     nscankeys;
1514         SysScanDesc toastscan;
1515         HeapTuple       ttup;
1516         TupleDesc       toasttupDesc;
1517         struct varlena *result;
1518         struct varatt_external toast_pointer;
1519         int32           attrsize;
1520         int32           residx;
1521         int32           nextidx;
1522         int                     numchunks;
1523         int                     startchunk;
1524         int                     endchunk;
1525         int32           startoffset;
1526         int32           endoffset;
1527         int                     totalchunks;
1528         Pointer         chunk;
1529         bool            isnull;
1530         char       *chunkdata;
1531         int32           chunksize;
1532         int32           chcpystrt;
1533         int32           chcpyend;
1534
1535         Assert(VARATT_IS_EXTERNAL(attr));
1536
1537         /* Must copy to access aligned fields */
1538         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1539
1540         /*
1541          * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
1542          * we can't return a compressed datum which is meaningful to toast later
1543          */
1544         Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1545
1546         attrsize = toast_pointer.va_extsize;
1547         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1548
1549         if (sliceoffset >= attrsize)
1550         {
1551                 sliceoffset = 0;
1552                 length = 0;
1553         }
1554
1555         if (((sliceoffset + length) > attrsize) || length < 0)
1556                 length = attrsize - sliceoffset;
1557
1558         result = (struct varlena *) palloc(length + VARHDRSZ);
1559
1560         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1561                 SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
1562         else
1563                 SET_VARSIZE(result, length + VARHDRSZ);
1564
1565         if (length == 0)
1566                 return result;                  /* Can save a lot of work at this point! */
1567
1568         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1569         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1570         numchunks = (endchunk - startchunk) + 1;
1571
1572         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1573         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1574
1575         /*
1576          * Open the toast relation and its index
1577          */
1578         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1579         toasttupDesc = toastrel->rd_att;
1580         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1581
1582         /*
1583          * Setup a scan key to fetch from the index. This is either two keys or
1584          * three depending on the number of chunks.
1585          */
1586         ScanKeyInit(&toastkey[0],
1587                                 (AttrNumber) 1,
1588                                 BTEqualStrategyNumber, F_OIDEQ,
1589                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1590
1591         /*
1592          * Use equality condition for one chunk, a range condition otherwise:
1593          */
1594         if (numchunks == 1)
1595         {
1596                 ScanKeyInit(&toastkey[1],
1597                                         (AttrNumber) 2,
1598                                         BTEqualStrategyNumber, F_INT4EQ,
1599                                         Int32GetDatum(startchunk));
1600                 nscankeys = 2;
1601         }
1602         else
1603         {
1604                 ScanKeyInit(&toastkey[1],
1605                                         (AttrNumber) 2,
1606                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1607                                         Int32GetDatum(startchunk));
1608                 ScanKeyInit(&toastkey[2],
1609                                         (AttrNumber) 2,
1610                                         BTLessEqualStrategyNumber, F_INT4LE,
1611                                         Int32GetDatum(endchunk));
1612                 nscankeys = 3;
1613         }
1614
1615         /*
1616          * Read the chunks by index
1617          *
1618          * The index is on (valueid, chunkidx) so they will come in order
1619          */
1620         nextidx = startchunk;
1621         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1622                                                                                  SnapshotToast, nscankeys, toastkey);
1623         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1624         {
1625                 /*
1626                  * Have a chunk, extract the sequence number and the data
1627                  */
1628                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1629                 Assert(!isnull);
1630                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1631                 Assert(!isnull);
1632                 if (!VARATT_IS_EXTENDED(chunk))
1633                 {
1634                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1635                         chunkdata = VARDATA(chunk);
1636                 }
1637                 else if (VARATT_IS_SHORT(chunk))
1638                 {
1639                         /* could happen due to heap_form_tuple doing its thing */
1640                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1641                         chunkdata = VARDATA_SHORT(chunk);
1642                 }
1643                 else
1644                 {
1645                         /* should never happen */
1646                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1647                                  toast_pointer.va_valueid,
1648                                  RelationGetRelationName(toastrel));
1649                         chunksize = 0;          /* keep compiler quiet */
1650                         chunkdata = NULL;
1651                 }
1652
1653                 /*
1654                  * Some checks on the data we've found
1655                  */
1656                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1657                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1658                                  residx, nextidx,
1659                                  toast_pointer.va_valueid,
1660                                  RelationGetRelationName(toastrel));
1661                 if (residx < totalchunks - 1)
1662                 {
1663                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1664                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
1665                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1666                                          residx, totalchunks,
1667                                          toast_pointer.va_valueid,
1668                                          RelationGetRelationName(toastrel));
1669                 }
1670                 else if (residx == totalchunks - 1)
1671                 {
1672                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1673                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
1674                                          chunksize,
1675                                          (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
1676                                          residx,
1677                                          toast_pointer.va_valueid,
1678                                          RelationGetRelationName(toastrel));
1679                 }
1680                 else
1681                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1682                                  residx,
1683                                  0, totalchunks - 1,
1684                                  toast_pointer.va_valueid,
1685                                  RelationGetRelationName(toastrel));
1686
1687                 /*
1688                  * Copy the data into proper place in our result
1689                  */
1690                 chcpystrt = 0;
1691                 chcpyend = chunksize - 1;
1692                 if (residx == startchunk)
1693                         chcpystrt = startoffset;
1694                 if (residx == endchunk)
1695                         chcpyend = endoffset;
1696
1697                 memcpy(VARDATA(result) +
1698                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1699                            chunkdata + chcpystrt,
1700                            (chcpyend - chcpystrt) + 1);
1701
1702                 nextidx++;
1703         }
1704
1705         /*
1706          * Final checks that we successfully fetched the datum
1707          */
1708         if (nextidx != (endchunk + 1))
1709                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
1710                          nextidx,
1711                          toast_pointer.va_valueid,
1712                          RelationGetRelationName(toastrel));
1713
1714         /*
1715          * End scan and close relations
1716          */
1717         systable_endscan_ordered(toastscan);
1718         index_close(toastidx, AccessShareLock);
1719         heap_close(toastrel, AccessShareLock);
1720
1721         return result;
1722 }