]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Support "expanded" objects, particularly arrays, for better performance.
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2015, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "common/pg_lzcompress.h"
39 #include "miscadmin.h"
40 #include "utils/expandeddatum.h"
41 #include "utils/fmgroids.h"
42 #include "utils/rel.h"
43 #include "utils/typcache.h"
44 #include "utils/tqual.h"
45
46
47 #undef TOAST_DEBUG
48
49 /*
50  *      The information at the start of the compressed toast data.
51  */
52 typedef struct toast_compress_header
53 {
54         int32           vl_len_;                /* varlena header (do not touch directly!) */
55         int32           rawsize;
56 } toast_compress_header;
57
58 /*
59  * Utilities for manipulation of header information for compressed
60  * toast entries.
61  */
62 #define TOAST_COMPRESS_HDRSZ            ((int32) sizeof(toast_compress_header))
63 #define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->rawsize)
64 #define TOAST_COMPRESS_RAWDATA(ptr) \
65         (((char *) (ptr)) + TOAST_COMPRESS_HDRSZ)
66 #define TOAST_COMPRESS_SET_RAWSIZE(ptr, len) \
67         (((toast_compress_header *) (ptr))->rawsize = (len))
68
69 static void toast_delete_datum(Relation rel, Datum value);
70 static Datum toast_save_datum(Relation rel, Datum value,
71                                  struct varlena * oldexternal, int options);
72 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
73 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
74 static struct varlena *toast_fetch_datum(struct varlena * attr);
75 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
76                                                 int32 sliceoffset, int32 length);
77 static struct varlena *toast_decompress_datum(struct varlena * attr);
78 static int toast_open_indexes(Relation toastrel,
79                                    LOCKMODE lock,
80                                    Relation **toastidxs,
81                                    int *num_indexes);
82 static void toast_close_indexes(Relation *toastidxs, int num_indexes,
83                                         LOCKMODE lock);
84
85
86 /* ----------
87  * heap_tuple_fetch_attr -
88  *
89  *      Public entry point to get back a toasted value from
90  *      external source (possibly still in compressed format).
91  *
92  * This will return a datum that contains all the data internally, ie, not
93  * relying on external storage or memory, but it can still be compressed or
94  * have a short header.  Note some callers assume that if the input is an
95  * EXTERNAL datum, the result will be a pfree'able chunk.
96  * ----------
97  */
98 struct varlena *
99 heap_tuple_fetch_attr(struct varlena * attr)
100 {
101         struct varlena *result;
102
103         if (VARATT_IS_EXTERNAL_ONDISK(attr))
104         {
105                 /*
106                  * This is an external stored plain value
107                  */
108                 result = toast_fetch_datum(attr);
109         }
110         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
111         {
112                 /*
113                  * This is an indirect pointer --- dereference it
114                  */
115                 struct varatt_indirect redirect;
116
117                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
118                 attr = (struct varlena *) redirect.pointer;
119
120                 /* nested indirect Datums aren't allowed */
121                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
122
123                 /* recurse if value is still external in some other way */
124                 if (VARATT_IS_EXTERNAL(attr))
125                         return heap_tuple_fetch_attr(attr);
126
127                 /*
128                  * Copy into the caller's memory context, in case caller tries to
129                  * pfree the result.
130                  */
131                 result = (struct varlena *) palloc(VARSIZE_ANY(attr));
132                 memcpy(result, attr, VARSIZE_ANY(attr));
133         }
134         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
135         {
136                 /*
137                  * This is an expanded-object pointer --- get flat format
138                  */
139                 ExpandedObjectHeader *eoh;
140                 Size            resultsize;
141
142                 eoh = DatumGetEOHP(PointerGetDatum(attr));
143                 resultsize = EOH_get_flat_size(eoh);
144                 result = (struct varlena *) palloc(resultsize);
145                 EOH_flatten_into(eoh, (void *) result, resultsize);
146         }
147         else
148         {
149                 /*
150                  * This is a plain value inside of the main tuple - why am I called?
151                  */
152                 result = attr;
153         }
154
155         return result;
156 }
157
158
159 /* ----------
160  * heap_tuple_untoast_attr -
161  *
162  *      Public entry point to get back a toasted value from compression
163  *      or external storage.  The result is always non-extended varlena form.
164  *
165  * Note some callers assume that if the input is an EXTERNAL or COMPRESSED
166  * datum, the result will be a pfree'able chunk.
167  * ----------
168  */
169 struct varlena *
170 heap_tuple_untoast_attr(struct varlena * attr)
171 {
172         if (VARATT_IS_EXTERNAL_ONDISK(attr))
173         {
174                 /*
175                  * This is an externally stored datum --- fetch it back from there
176                  */
177                 attr = toast_fetch_datum(attr);
178                 /* If it's compressed, decompress it */
179                 if (VARATT_IS_COMPRESSED(attr))
180                 {
181                         struct varlena *tmp = attr;
182
183                         attr = toast_decompress_datum(tmp);
184                         pfree(tmp);
185                 }
186         }
187         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
188         {
189                 /*
190                  * This is an indirect pointer --- dereference it
191                  */
192                 struct varatt_indirect redirect;
193
194                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
195                 attr = (struct varlena *) redirect.pointer;
196
197                 /* nested indirect Datums aren't allowed */
198                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
199
200                 /* recurse in case value is still extended in some other way */
201                 attr = heap_tuple_untoast_attr(attr);
202
203                 /* if it isn't, we'd better copy it */
204                 if (attr == (struct varlena *) redirect.pointer)
205                 {
206                         struct varlena *result;
207
208                         result = (struct varlena *) palloc(VARSIZE_ANY(attr));
209                         memcpy(result, attr, VARSIZE_ANY(attr));
210                         attr = result;
211                 }
212         }
213         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
214         {
215                 /*
216                  * This is an expanded-object pointer --- get flat format
217                  */
218                 attr = heap_tuple_fetch_attr(attr);
219                 /* flatteners are not allowed to produce compressed/short output */
220                 Assert(!VARATT_IS_EXTENDED(attr));
221         }
222         else if (VARATT_IS_COMPRESSED(attr))
223         {
224                 /*
225                  * This is a compressed value inside of the main tuple
226                  */
227                 attr = toast_decompress_datum(attr);
228         }
229         else if (VARATT_IS_SHORT(attr))
230         {
231                 /*
232                  * This is a short-header varlena --- convert to 4-byte header format
233                  */
234                 Size            data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
235                 Size            new_size = data_size + VARHDRSZ;
236                 struct varlena *new_attr;
237
238                 new_attr = (struct varlena *) palloc(new_size);
239                 SET_VARSIZE(new_attr, new_size);
240                 memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
241                 attr = new_attr;
242         }
243
244         return attr;
245 }
246
247
248 /* ----------
249  * heap_tuple_untoast_attr_slice -
250  *
251  *              Public entry point to get back part of a toasted value
252  *              from compression or external storage.
253  * ----------
254  */
255 struct varlena *
256 heap_tuple_untoast_attr_slice(struct varlena * attr,
257                                                           int32 sliceoffset, int32 slicelength)
258 {
259         struct varlena *preslice;
260         struct varlena *result;
261         char       *attrdata;
262         int32           attrsize;
263
264         if (VARATT_IS_EXTERNAL_ONDISK(attr))
265         {
266                 struct varatt_external toast_pointer;
267
268                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
269
270                 /* fast path for non-compressed external datums */
271                 if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
272                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
273
274                 /* fetch it back (compressed marker will get set automatically) */
275                 preslice = toast_fetch_datum(attr);
276         }
277         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
278         {
279                 struct varatt_indirect redirect;
280
281                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
282
283                 /* nested indirect Datums aren't allowed */
284                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
285
286                 return heap_tuple_untoast_attr_slice(redirect.pointer,
287                                                                                          sliceoffset, slicelength);
288         }
289         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
290         {
291                 /* pass it off to heap_tuple_fetch_attr to flatten */
292                 preslice = heap_tuple_fetch_attr(attr);
293         }
294         else
295                 preslice = attr;
296
297         Assert(!VARATT_IS_EXTERNAL(preslice));
298
299         if (VARATT_IS_COMPRESSED(preslice))
300         {
301                 struct varlena *tmp = preslice;
302
303                 preslice = toast_decompress_datum(tmp);
304
305                 if (tmp != attr)
306                         pfree(tmp);
307         }
308
309         if (VARATT_IS_SHORT(preslice))
310         {
311                 attrdata = VARDATA_SHORT(preslice);
312                 attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
313         }
314         else
315         {
316                 attrdata = VARDATA(preslice);
317                 attrsize = VARSIZE(preslice) - VARHDRSZ;
318         }
319
320         /* slicing of datum for compressed cases and plain value */
321
322         if (sliceoffset >= attrsize)
323         {
324                 sliceoffset = 0;
325                 slicelength = 0;
326         }
327
328         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
329                 slicelength = attrsize - sliceoffset;
330
331         result = (struct varlena *) palloc(slicelength + VARHDRSZ);
332         SET_VARSIZE(result, slicelength + VARHDRSZ);
333
334         memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
335
336         if (preslice != attr)
337                 pfree(preslice);
338
339         return result;
340 }
341
342
343 /* ----------
344  * toast_raw_datum_size -
345  *
346  *      Return the raw (detoasted) size of a varlena datum
347  *      (including the VARHDRSZ header)
348  * ----------
349  */
350 Size
351 toast_raw_datum_size(Datum value)
352 {
353         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
354         Size            result;
355
356         if (VARATT_IS_EXTERNAL_ONDISK(attr))
357         {
358                 /* va_rawsize is the size of the original datum -- including header */
359                 struct varatt_external toast_pointer;
360
361                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
362                 result = toast_pointer.va_rawsize;
363         }
364         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
365         {
366                 struct varatt_indirect toast_pointer;
367
368                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
369
370                 /* nested indirect Datums aren't allowed */
371                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
372
373                 return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
374         }
375         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
376         {
377                 result = EOH_get_flat_size(DatumGetEOHP(value));
378         }
379         else if (VARATT_IS_COMPRESSED(attr))
380         {
381                 /* here, va_rawsize is just the payload size */
382                 result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
383         }
384         else if (VARATT_IS_SHORT(attr))
385         {
386                 /*
387                  * we have to normalize the header length to VARHDRSZ or else the
388                  * callers of this function will be confused.
389                  */
390                 result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
391         }
392         else
393         {
394                 /* plain untoasted datum */
395                 result = VARSIZE(attr);
396         }
397         return result;
398 }
399
400 /* ----------
401  * toast_datum_size
402  *
403  *      Return the physical storage size (possibly compressed) of a varlena datum
404  * ----------
405  */
406 Size
407 toast_datum_size(Datum value)
408 {
409         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
410         Size            result;
411
412         if (VARATT_IS_EXTERNAL_ONDISK(attr))
413         {
414                 /*
415                  * Attribute is stored externally - return the extsize whether
416                  * compressed or not.  We do not count the size of the toast pointer
417                  * ... should we?
418                  */
419                 struct varatt_external toast_pointer;
420
421                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
422                 result = toast_pointer.va_extsize;
423         }
424         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
425         {
426                 struct varatt_indirect toast_pointer;
427
428                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
429
430                 /* nested indirect Datums aren't allowed */
431                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
432
433                 return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
434         }
435         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
436         {
437                 result = EOH_get_flat_size(DatumGetEOHP(value));
438         }
439         else if (VARATT_IS_SHORT(attr))
440         {
441                 result = VARSIZE_SHORT(attr);
442         }
443         else
444         {
445                 /*
446                  * Attribute is stored inline either compressed or not, just calculate
447                  * the size of the datum in either case.
448                  */
449                 result = VARSIZE(attr);
450         }
451         return result;
452 }
453
454
455 /* ----------
456  * toast_delete -
457  *
458  *      Cascaded delete toast-entries on DELETE
459  * ----------
460  */
461 void
462 toast_delete(Relation rel, HeapTuple oldtup)
463 {
464         TupleDesc       tupleDesc;
465         Form_pg_attribute *att;
466         int                     numAttrs;
467         int                     i;
468         Datum           toast_values[MaxHeapAttributeNumber];
469         bool            toast_isnull[MaxHeapAttributeNumber];
470
471         /*
472          * We should only ever be called for tuples of plain relations or
473          * materialized views --- recursing on a toast rel is bad news.
474          */
475         Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
476                    rel->rd_rel->relkind == RELKIND_MATVIEW);
477
478         /*
479          * Get the tuple descriptor and break down the tuple into fields.
480          *
481          * NOTE: it's debatable whether to use heap_deform_tuple() here or just
482          * heap_getattr() only the varlena columns.  The latter could win if there
483          * are few varlena columns and many non-varlena ones. However,
484          * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
485          * O(N^2) if there are many varlena columns, so it seems better to err on
486          * the side of linear cost.  (We won't even be here unless there's at
487          * least one varlena column, by the way.)
488          */
489         tupleDesc = rel->rd_att;
490         att = tupleDesc->attrs;
491         numAttrs = tupleDesc->natts;
492
493         Assert(numAttrs <= MaxHeapAttributeNumber);
494         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
495
496         /*
497          * Check for external stored attributes and delete them from the secondary
498          * relation.
499          */
500         for (i = 0; i < numAttrs; i++)
501         {
502                 if (att[i]->attlen == -1)
503                 {
504                         Datum           value = toast_values[i];
505
506                         if (toast_isnull[i])
507                                 continue;
508                         else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
509                                 toast_delete_datum(rel, value);
510                 }
511         }
512 }
513
514
515 /* ----------
516  * toast_insert_or_update -
517  *
518  *      Delete no-longer-used toast-entries and create new ones to
519  *      make the new tuple fit on INSERT or UPDATE
520  *
521  * Inputs:
522  *      newtup: the candidate new tuple to be inserted
523  *      oldtup: the old row version for UPDATE, or NULL for INSERT
524  *      options: options to be passed to heap_insert() for toast rows
525  * Result:
526  *      either newtup if no toasting is needed, or a palloc'd modified tuple
527  *      that is what should actually get stored
528  *
529  * NOTE: neither newtup nor oldtup will be modified.  This is a change
530  * from the pre-8.1 API of this routine.
531  * ----------
532  */
533 HeapTuple
534 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
535                                            int options)
536 {
537         HeapTuple       result_tuple;
538         TupleDesc       tupleDesc;
539         Form_pg_attribute *att;
540         int                     numAttrs;
541         int                     i;
542
543         bool            need_change = false;
544         bool            need_free = false;
545         bool            need_delold = false;
546         bool            has_nulls = false;
547
548         Size            maxDataLen;
549         Size            hoff;
550
551         char            toast_action[MaxHeapAttributeNumber];
552         bool            toast_isnull[MaxHeapAttributeNumber];
553         bool            toast_oldisnull[MaxHeapAttributeNumber];
554         Datum           toast_values[MaxHeapAttributeNumber];
555         Datum           toast_oldvalues[MaxHeapAttributeNumber];
556         struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
557         int32           toast_sizes[MaxHeapAttributeNumber];
558         bool            toast_free[MaxHeapAttributeNumber];
559         bool            toast_delold[MaxHeapAttributeNumber];
560
561         /*
562          * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super
563          * deletions just normally insert/delete the toast values. It seems
564          * easiest to deal with that here, instead on, potentially, multiple
565          * callers.
566          */
567         options &= ~HEAP_INSERT_SPECULATIVE;
568
569         /*
570          * We should only ever be called for tuples of plain relations or
571          * materialized views --- recursing on a toast rel is bad news.
572          */
573         Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
574                    rel->rd_rel->relkind == RELKIND_MATVIEW);
575
576         /*
577          * Get the tuple descriptor and break down the tuple(s) into fields.
578          */
579         tupleDesc = rel->rd_att;
580         att = tupleDesc->attrs;
581         numAttrs = tupleDesc->natts;
582
583         Assert(numAttrs <= MaxHeapAttributeNumber);
584         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
585         if (oldtup != NULL)
586                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
587
588         /* ----------
589          * Then collect information about the values given
590          *
591          * NOTE: toast_action[i] can have these values:
592          *              ' '             default handling
593          *              'p'             already processed --- don't touch it
594          *              'x'             incompressible, but OK to move off
595          *
596          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
597          *              toast_action[i] different from 'p'.
598          * ----------
599          */
600         memset(toast_action, ' ', numAttrs * sizeof(char));
601         memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
602         memset(toast_free, 0, numAttrs * sizeof(bool));
603         memset(toast_delold, 0, numAttrs * sizeof(bool));
604
605         for (i = 0; i < numAttrs; i++)
606         {
607                 struct varlena *old_value;
608                 struct varlena *new_value;
609
610                 if (oldtup != NULL)
611                 {
612                         /*
613                          * For UPDATE get the old and new values of this attribute
614                          */
615                         old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
616                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
617
618                         /*
619                          * If the old value is stored on disk, check if it has changed so
620                          * we have to delete it later.
621                          */
622                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
623                                 VARATT_IS_EXTERNAL_ONDISK(old_value))
624                         {
625                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
626                                         memcmp((char *) old_value, (char *) new_value,
627                                                    VARSIZE_EXTERNAL(old_value)) != 0)
628                                 {
629                                         /*
630                                          * The old external stored value isn't needed any more
631                                          * after the update
632                                          */
633                                         toast_delold[i] = true;
634                                         need_delold = true;
635                                 }
636                                 else
637                                 {
638                                         /*
639                                          * This attribute isn't changed by this update so we reuse
640                                          * the original reference to the old value in the new
641                                          * tuple.
642                                          */
643                                         toast_action[i] = 'p';
644                                         continue;
645                                 }
646                         }
647                 }
648                 else
649                 {
650                         /*
651                          * For INSERT simply get the new value
652                          */
653                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
654                 }
655
656                 /*
657                  * Handle NULL attributes
658                  */
659                 if (toast_isnull[i])
660                 {
661                         toast_action[i] = 'p';
662                         has_nulls = true;
663                         continue;
664                 }
665
666                 /*
667                  * Now look at varlena attributes
668                  */
669                 if (att[i]->attlen == -1)
670                 {
671                         /*
672                          * If the table's attribute says PLAIN always, force it so.
673                          */
674                         if (att[i]->attstorage == 'p')
675                                 toast_action[i] = 'p';
676
677                         /*
678                          * We took care of UPDATE above, so any external value we find
679                          * still in the tuple must be someone else's that we cannot reuse
680                          * (this includes the case of an out-of-line in-memory datum).
681                          * Fetch it back (without decompression, unless we are forcing
682                          * PLAIN storage).  If necessary, we'll push it out as a new
683                          * external value below.
684                          */
685                         if (VARATT_IS_EXTERNAL(new_value))
686                         {
687                                 toast_oldexternal[i] = new_value;
688                                 if (att[i]->attstorage == 'p')
689                                         new_value = heap_tuple_untoast_attr(new_value);
690                                 else
691                                         new_value = heap_tuple_fetch_attr(new_value);
692                                 toast_values[i] = PointerGetDatum(new_value);
693                                 toast_free[i] = true;
694                                 need_change = true;
695                                 need_free = true;
696                         }
697
698                         /*
699                          * Remember the size of this attribute
700                          */
701                         toast_sizes[i] = VARSIZE_ANY(new_value);
702                 }
703                 else
704                 {
705                         /*
706                          * Not a varlena attribute, plain storage always
707                          */
708                         toast_action[i] = 'p';
709                 }
710         }
711
712         /* ----------
713          * Compress and/or save external until data fits into target length
714          *
715          *      1: Inline compress attributes with attstorage 'x', and store very
716          *         large attributes with attstorage 'x' or 'e' external immediately
717          *      2: Store attributes with attstorage 'x' or 'e' external
718          *      3: Inline compress attributes with attstorage 'm'
719          *      4: Store attributes with attstorage 'm' external
720          * ----------
721          */
722
723         /* compute header overhead --- this should match heap_form_tuple() */
724         hoff = SizeofHeapTupleHeader;
725         if (has_nulls)
726                 hoff += BITMAPLEN(numAttrs);
727         if (newtup->t_data->t_infomask & HEAP_HASOID)
728                 hoff += sizeof(Oid);
729         hoff = MAXALIGN(hoff);
730         /* now convert to a limit on the tuple data size */
731         maxDataLen = TOAST_TUPLE_TARGET - hoff;
732
733         /*
734          * Look for attributes with attstorage 'x' to compress.  Also find large
735          * attributes with attstorage 'x' or 'e', and store them external.
736          */
737         while (heap_compute_data_size(tupleDesc,
738                                                                   toast_values, toast_isnull) > maxDataLen)
739         {
740                 int                     biggest_attno = -1;
741                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
742                 Datum           old_value;
743                 Datum           new_value;
744
745                 /*
746                  * Search for the biggest yet unprocessed internal attribute
747                  */
748                 for (i = 0; i < numAttrs; i++)
749                 {
750                         if (toast_action[i] != ' ')
751                                 continue;
752                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
753                                 continue;               /* can't happen, toast_action would be 'p' */
754                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
755                                 continue;
756                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
757                                 continue;
758                         if (toast_sizes[i] > biggest_size)
759                         {
760                                 biggest_attno = i;
761                                 biggest_size = toast_sizes[i];
762                         }
763                 }
764
765                 if (biggest_attno < 0)
766                         break;
767
768                 /*
769                  * Attempt to compress it inline, if it has attstorage 'x'
770                  */
771                 i = biggest_attno;
772                 if (att[i]->attstorage == 'x')
773                 {
774                         old_value = toast_values[i];
775                         new_value = toast_compress_datum(old_value);
776
777                         if (DatumGetPointer(new_value) != NULL)
778                         {
779                                 /* successful compression */
780                                 if (toast_free[i])
781                                         pfree(DatumGetPointer(old_value));
782                                 toast_values[i] = new_value;
783                                 toast_free[i] = true;
784                                 toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
785                                 need_change = true;
786                                 need_free = true;
787                         }
788                         else
789                         {
790                                 /* incompressible, ignore on subsequent compression passes */
791                                 toast_action[i] = 'x';
792                         }
793                 }
794                 else
795                 {
796                         /* has attstorage 'e', ignore on subsequent compression passes */
797                         toast_action[i] = 'x';
798                 }
799
800                 /*
801                  * If this value is by itself more than maxDataLen (after compression
802                  * if any), push it out to the toast table immediately, if possible.
803                  * This avoids uselessly compressing other fields in the common case
804                  * where we have one long field and several short ones.
805                  *
806                  * XXX maybe the threshold should be less than maxDataLen?
807                  */
808                 if (toast_sizes[i] > maxDataLen &&
809                         rel->rd_rel->reltoastrelid != InvalidOid)
810                 {
811                         old_value = toast_values[i];
812                         toast_action[i] = 'p';
813                         toast_values[i] = toast_save_datum(rel, toast_values[i],
814                                                                                            toast_oldexternal[i], options);
815                         if (toast_free[i])
816                                 pfree(DatumGetPointer(old_value));
817                         toast_free[i] = true;
818                         need_change = true;
819                         need_free = true;
820                 }
821         }
822
823         /*
824          * Second we look for attributes of attstorage 'x' or 'e' that are still
825          * inline.  But skip this if there's no toast table to push them to.
826          */
827         while (heap_compute_data_size(tupleDesc,
828                                                                   toast_values, toast_isnull) > maxDataLen &&
829                    rel->rd_rel->reltoastrelid != InvalidOid)
830         {
831                 int                     biggest_attno = -1;
832                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
833                 Datum           old_value;
834
835                 /*------
836                  * Search for the biggest yet inlined attribute with
837                  * attstorage equals 'x' or 'e'
838                  *------
839                  */
840                 for (i = 0; i < numAttrs; i++)
841                 {
842                         if (toast_action[i] == 'p')
843                                 continue;
844                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
845                                 continue;               /* can't happen, toast_action would be 'p' */
846                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
847                                 continue;
848                         if (toast_sizes[i] > biggest_size)
849                         {
850                                 biggest_attno = i;
851                                 biggest_size = toast_sizes[i];
852                         }
853                 }
854
855                 if (biggest_attno < 0)
856                         break;
857
858                 /*
859                  * Store this external
860                  */
861                 i = biggest_attno;
862                 old_value = toast_values[i];
863                 toast_action[i] = 'p';
864                 toast_values[i] = toast_save_datum(rel, toast_values[i],
865                                                                                    toast_oldexternal[i], options);
866                 if (toast_free[i])
867                         pfree(DatumGetPointer(old_value));
868                 toast_free[i] = true;
869
870                 need_change = true;
871                 need_free = true;
872         }
873
874         /*
875          * Round 3 - this time we take attributes with storage 'm' into
876          * compression
877          */
878         while (heap_compute_data_size(tupleDesc,
879                                                                   toast_values, toast_isnull) > maxDataLen)
880         {
881                 int                     biggest_attno = -1;
882                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
883                 Datum           old_value;
884                 Datum           new_value;
885
886                 /*
887                  * Search for the biggest yet uncompressed internal attribute
888                  */
889                 for (i = 0; i < numAttrs; i++)
890                 {
891                         if (toast_action[i] != ' ')
892                                 continue;
893                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
894                                 continue;               /* can't happen, toast_action would be 'p' */
895                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
896                                 continue;
897                         if (att[i]->attstorage != 'm')
898                                 continue;
899                         if (toast_sizes[i] > biggest_size)
900                         {
901                                 biggest_attno = i;
902                                 biggest_size = toast_sizes[i];
903                         }
904                 }
905
906                 if (biggest_attno < 0)
907                         break;
908
909                 /*
910                  * Attempt to compress it inline
911                  */
912                 i = biggest_attno;
913                 old_value = toast_values[i];
914                 new_value = toast_compress_datum(old_value);
915
916                 if (DatumGetPointer(new_value) != NULL)
917                 {
918                         /* successful compression */
919                         if (toast_free[i])
920                                 pfree(DatumGetPointer(old_value));
921                         toast_values[i] = new_value;
922                         toast_free[i] = true;
923                         toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
924                         need_change = true;
925                         need_free = true;
926                 }
927                 else
928                 {
929                         /* incompressible, ignore on subsequent compression passes */
930                         toast_action[i] = 'x';
931                 }
932         }
933
934         /*
935          * Finally we store attributes of type 'm' externally.  At this point we
936          * increase the target tuple size, so that 'm' attributes aren't stored
937          * externally unless really necessary.
938          */
939         maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
940
941         while (heap_compute_data_size(tupleDesc,
942                                                                   toast_values, toast_isnull) > maxDataLen &&
943                    rel->rd_rel->reltoastrelid != InvalidOid)
944         {
945                 int                     biggest_attno = -1;
946                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
947                 Datum           old_value;
948
949                 /*--------
950                  * Search for the biggest yet inlined attribute with
951                  * attstorage = 'm'
952                  *--------
953                  */
954                 for (i = 0; i < numAttrs; i++)
955                 {
956                         if (toast_action[i] == 'p')
957                                 continue;
958                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
959                                 continue;               /* can't happen, toast_action would be 'p' */
960                         if (att[i]->attstorage != 'm')
961                                 continue;
962                         if (toast_sizes[i] > biggest_size)
963                         {
964                                 biggest_attno = i;
965                                 biggest_size = toast_sizes[i];
966                         }
967                 }
968
969                 if (biggest_attno < 0)
970                         break;
971
972                 /*
973                  * Store this external
974                  */
975                 i = biggest_attno;
976                 old_value = toast_values[i];
977                 toast_action[i] = 'p';
978                 toast_values[i] = toast_save_datum(rel, toast_values[i],
979                                                                                    toast_oldexternal[i], options);
980                 if (toast_free[i])
981                         pfree(DatumGetPointer(old_value));
982                 toast_free[i] = true;
983
984                 need_change = true;
985                 need_free = true;
986         }
987
988         /*
989          * In the case we toasted any values, we need to build a new heap tuple
990          * with the changed values.
991          */
992         if (need_change)
993         {
994                 HeapTupleHeader olddata = newtup->t_data;
995                 HeapTupleHeader new_data;
996                 int32           new_header_len;
997                 int32           new_data_len;
998                 int32           new_tuple_len;
999
1000                 /*
1001                  * Calculate the new size of the tuple.
1002                  *
1003                  * Note: we used to assume here that the old tuple's t_hoff must equal
1004                  * the new_header_len value, but that was incorrect.  The old tuple
1005                  * might have a smaller-than-current natts, if there's been an ALTER
1006                  * TABLE ADD COLUMN since it was stored; and that would lead to a
1007                  * different conclusion about the size of the null bitmap, or even
1008                  * whether there needs to be one at all.
1009                  */
1010                 new_header_len = SizeofHeapTupleHeader;
1011                 if (has_nulls)
1012                         new_header_len += BITMAPLEN(numAttrs);
1013                 if (olddata->t_infomask & HEAP_HASOID)
1014                         new_header_len += sizeof(Oid);
1015                 new_header_len = MAXALIGN(new_header_len);
1016                 new_data_len = heap_compute_data_size(tupleDesc,
1017                                                                                           toast_values, toast_isnull);
1018                 new_tuple_len = new_header_len + new_data_len;
1019
1020                 /*
1021                  * Allocate and zero the space needed, and fill HeapTupleData fields.
1022                  */
1023                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
1024                 result_tuple->t_len = new_tuple_len;
1025                 result_tuple->t_self = newtup->t_self;
1026                 result_tuple->t_tableOid = newtup->t_tableOid;
1027                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
1028                 result_tuple->t_data = new_data;
1029
1030                 /*
1031                  * Copy the existing tuple header, but adjust natts and t_hoff.
1032                  */
1033                 memcpy(new_data, olddata, SizeofHeapTupleHeader);
1034                 HeapTupleHeaderSetNatts(new_data, numAttrs);
1035                 new_data->t_hoff = new_header_len;
1036                 if (olddata->t_infomask & HEAP_HASOID)
1037                         HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
1038
1039                 /* Copy over the data, and fill the null bitmap if needed */
1040                 heap_fill_tuple(tupleDesc,
1041                                                 toast_values,
1042                                                 toast_isnull,
1043                                                 (char *) new_data + new_header_len,
1044                                                 new_data_len,
1045                                                 &(new_data->t_infomask),
1046                                                 has_nulls ? new_data->t_bits : NULL);
1047         }
1048         else
1049                 result_tuple = newtup;
1050
1051         /*
1052          * Free allocated temp values
1053          */
1054         if (need_free)
1055                 for (i = 0; i < numAttrs; i++)
1056                         if (toast_free[i])
1057                                 pfree(DatumGetPointer(toast_values[i]));
1058
1059         /*
1060          * Delete external values from the old tuple
1061          */
1062         if (need_delold)
1063                 for (i = 0; i < numAttrs; i++)
1064                         if (toast_delold[i])
1065                                 toast_delete_datum(rel, toast_oldvalues[i]);
1066
1067         return result_tuple;
1068 }
1069
1070
1071 /* ----------
1072  * toast_flatten_tuple -
1073  *
1074  *      "Flatten" a tuple to contain no out-of-line toasted fields.
1075  *      (This does not eliminate compressed or short-header datums.)
1076  *
1077  *      Note: we expect the caller already checked HeapTupleHasExternal(tup),
1078  *      so there is no need for a short-circuit path.
1079  * ----------
1080  */
1081 HeapTuple
1082 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
1083 {
1084         HeapTuple       new_tuple;
1085         Form_pg_attribute *att = tupleDesc->attrs;
1086         int                     numAttrs = tupleDesc->natts;
1087         int                     i;
1088         Datum           toast_values[MaxTupleAttributeNumber];
1089         bool            toast_isnull[MaxTupleAttributeNumber];
1090         bool            toast_free[MaxTupleAttributeNumber];
1091
1092         /*
1093          * Break down the tuple into fields.
1094          */
1095         Assert(numAttrs <= MaxTupleAttributeNumber);
1096         heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
1097
1098         memset(toast_free, 0, numAttrs * sizeof(bool));
1099
1100         for (i = 0; i < numAttrs; i++)
1101         {
1102                 /*
1103                  * Look at non-null varlena attributes
1104                  */
1105                 if (!toast_isnull[i] && att[i]->attlen == -1)
1106                 {
1107                         struct varlena *new_value;
1108
1109                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1110                         if (VARATT_IS_EXTERNAL(new_value))
1111                         {
1112                                 new_value = heap_tuple_fetch_attr(new_value);
1113                                 toast_values[i] = PointerGetDatum(new_value);
1114                                 toast_free[i] = true;
1115                         }
1116                 }
1117         }
1118
1119         /*
1120          * Form the reconfigured tuple.
1121          */
1122         new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
1123
1124         /*
1125          * Be sure to copy the tuple's OID and identity fields.  We also make a
1126          * point of copying visibility info, just in case anybody looks at those
1127          * fields in a syscache entry.
1128          */
1129         if (tupleDesc->tdhasoid)
1130                 HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
1131
1132         new_tuple->t_self = tup->t_self;
1133         new_tuple->t_tableOid = tup->t_tableOid;
1134
1135         new_tuple->t_data->t_choice = tup->t_data->t_choice;
1136         new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
1137         new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
1138         new_tuple->t_data->t_infomask |=
1139                 tup->t_data->t_infomask & HEAP_XACT_MASK;
1140         new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
1141         new_tuple->t_data->t_infomask2 |=
1142                 tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1143
1144         /*
1145          * Free allocated temp values
1146          */
1147         for (i = 0; i < numAttrs; i++)
1148                 if (toast_free[i])
1149                         pfree(DatumGetPointer(toast_values[i]));
1150
1151         return new_tuple;
1152 }
1153
1154
1155 /* ----------
1156  * toast_flatten_tuple_to_datum -
1157  *
1158  *      "Flatten" a tuple containing out-of-line toasted fields into a Datum.
1159  *      The result is always palloc'd in the current memory context.
1160  *
1161  *      We have a general rule that Datums of container types (rows, arrays,
1162  *      ranges, etc) must not contain any external TOAST pointers.  Without
1163  *      this rule, we'd have to look inside each Datum when preparing a tuple
1164  *      for storage, which would be expensive and would fail to extend cleanly
1165  *      to new sorts of container types.
1166  *
1167  *      However, we don't want to say that tuples represented as HeapTuples
1168  *      can't contain toasted fields, so instead this routine should be called
1169  *      when such a HeapTuple is being converted into a Datum.
1170  *
1171  *      While we're at it, we decompress any compressed fields too.  This is not
1172  *      necessary for correctness, but reflects an expectation that compression
1173  *      will be more effective if applied to the whole tuple not individual
1174  *      fields.  We are not so concerned about that that we want to deconstruct
1175  *      and reconstruct tuples just to get rid of compressed fields, however.
1176  *      So callers typically won't call this unless they see that the tuple has
1177  *      at least one external field.
1178  *
1179  *      On the other hand, in-line short-header varlena fields are left alone.
1180  *      If we "untoasted" them here, they'd just get changed back to short-header
1181  *      format anyway within heap_fill_tuple.
1182  * ----------
1183  */
1184 Datum
1185 toast_flatten_tuple_to_datum(HeapTupleHeader tup,
1186                                                          uint32 tup_len,
1187                                                          TupleDesc tupleDesc)
1188 {
1189         HeapTupleHeader new_data;
1190         int32           new_header_len;
1191         int32           new_data_len;
1192         int32           new_tuple_len;
1193         HeapTupleData tmptup;
1194         Form_pg_attribute *att = tupleDesc->attrs;
1195         int                     numAttrs = tupleDesc->natts;
1196         int                     i;
1197         bool            has_nulls = false;
1198         Datum           toast_values[MaxTupleAttributeNumber];
1199         bool            toast_isnull[MaxTupleAttributeNumber];
1200         bool            toast_free[MaxTupleAttributeNumber];
1201
1202         /* Build a temporary HeapTuple control structure */
1203         tmptup.t_len = tup_len;
1204         ItemPointerSetInvalid(&(tmptup.t_self));
1205         tmptup.t_tableOid = InvalidOid;
1206         tmptup.t_data = tup;
1207
1208         /*
1209          * Break down the tuple into fields.
1210          */
1211         Assert(numAttrs <= MaxTupleAttributeNumber);
1212         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1213
1214         memset(toast_free, 0, numAttrs * sizeof(bool));
1215
1216         for (i = 0; i < numAttrs; i++)
1217         {
1218                 /*
1219                  * Look at non-null varlena attributes
1220                  */
1221                 if (toast_isnull[i])
1222                         has_nulls = true;
1223                 else if (att[i]->attlen == -1)
1224                 {
1225                         struct varlena *new_value;
1226
1227                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1228                         if (VARATT_IS_EXTERNAL(new_value) ||
1229                                 VARATT_IS_COMPRESSED(new_value))
1230                         {
1231                                 new_value = heap_tuple_untoast_attr(new_value);
1232                                 toast_values[i] = PointerGetDatum(new_value);
1233                                 toast_free[i] = true;
1234                         }
1235                 }
1236         }
1237
1238         /*
1239          * Calculate the new size of the tuple.
1240          *
1241          * This should match the reconstruction code in toast_insert_or_update.
1242          */
1243         new_header_len = SizeofHeapTupleHeader;
1244         if (has_nulls)
1245                 new_header_len += BITMAPLEN(numAttrs);
1246         if (tup->t_infomask & HEAP_HASOID)
1247                 new_header_len += sizeof(Oid);
1248         new_header_len = MAXALIGN(new_header_len);
1249         new_data_len = heap_compute_data_size(tupleDesc,
1250                                                                                   toast_values, toast_isnull);
1251         new_tuple_len = new_header_len + new_data_len;
1252
1253         new_data = (HeapTupleHeader) palloc0(new_tuple_len);
1254
1255         /*
1256          * Copy the existing tuple header, but adjust natts and t_hoff.
1257          */
1258         memcpy(new_data, tup, SizeofHeapTupleHeader);
1259         HeapTupleHeaderSetNatts(new_data, numAttrs);
1260         new_data->t_hoff = new_header_len;
1261         if (tup->t_infomask & HEAP_HASOID)
1262                 HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(tup));
1263
1264         /* Set the composite-Datum header fields correctly */
1265         HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
1266         HeapTupleHeaderSetTypeId(new_data, tupleDesc->tdtypeid);
1267         HeapTupleHeaderSetTypMod(new_data, tupleDesc->tdtypmod);
1268
1269         /* Copy over the data, and fill the null bitmap if needed */
1270         heap_fill_tuple(tupleDesc,
1271                                         toast_values,
1272                                         toast_isnull,
1273                                         (char *) new_data + new_header_len,
1274                                         new_data_len,
1275                                         &(new_data->t_infomask),
1276                                         has_nulls ? new_data->t_bits : NULL);
1277
1278         /*
1279          * Free allocated temp values
1280          */
1281         for (i = 0; i < numAttrs; i++)
1282                 if (toast_free[i])
1283                         pfree(DatumGetPointer(toast_values[i]));
1284
1285         return PointerGetDatum(new_data);
1286 }
1287
1288
1289 /* ----------
1290  * toast_compress_datum -
1291  *
1292  *      Create a compressed version of a varlena datum
1293  *
1294  *      If we fail (ie, compressed result is actually bigger than original)
1295  *      then return NULL.  We must not use compressed data if it'd expand
1296  *      the tuple!
1297  *
1298  *      We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1299  *      copying them.  But we can't handle external or compressed datums.
1300  * ----------
1301  */
1302 Datum
1303 toast_compress_datum(Datum value)
1304 {
1305         struct varlena *tmp;
1306         int32           valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1307         int32           len;
1308
1309         Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1310         Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1311
1312         /*
1313          * No point in wasting a palloc cycle if value size is out of the allowed
1314          * range for compression
1315          */
1316         if (valsize < PGLZ_strategy_default->min_input_size ||
1317                 valsize > PGLZ_strategy_default->max_input_size)
1318                 return PointerGetDatum(NULL);
1319
1320         tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
1321                                                                         TOAST_COMPRESS_HDRSZ);
1322
1323         /*
1324          * We recheck the actual size even if pglz_compress() reports success,
1325          * because it might be satisfied with having saved as little as one byte
1326          * in the compressed data --- which could turn into a net loss once you
1327          * consider header and alignment padding.  Worst case, the compressed
1328          * format might require three padding bytes (plus header, which is
1329          * included in VARSIZE(tmp)), whereas the uncompressed format would take
1330          * only one header byte and no padding if the value is short enough.  So
1331          * we insist on a savings of more than 2 bytes to ensure we have a gain.
1332          */
1333         len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)),
1334                                                 valsize,
1335                                                 TOAST_COMPRESS_RAWDATA(tmp),
1336                                                 PGLZ_strategy_default);
1337         if (len >= 0 &&
1338                 len + TOAST_COMPRESS_HDRSZ < valsize - 2)
1339         {
1340                 TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize);
1341                 SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ);
1342                 /* successful compression */
1343                 return PointerGetDatum(tmp);
1344         }
1345         else
1346         {
1347                 /* incompressible data */
1348                 pfree(tmp);
1349                 return PointerGetDatum(NULL);
1350         }
1351 }
1352
1353
1354 /* ----------
1355  * toast_get_valid_index
1356  *
1357  *      Get OID of valid index associated to given toast relation. A toast
1358  *      relation can have only one valid index at the same time.
1359  */
1360 Oid
1361 toast_get_valid_index(Oid toastoid, LOCKMODE lock)
1362 {
1363         int                     num_indexes;
1364         int                     validIndex;
1365         Oid                     validIndexOid;
1366         Relation   *toastidxs;
1367         Relation        toastrel;
1368
1369         /* Open the toast relation */
1370         toastrel = heap_open(toastoid, lock);
1371
1372         /* Look for the valid index of the toast relation */
1373         validIndex = toast_open_indexes(toastrel,
1374                                                                         lock,
1375                                                                         &toastidxs,
1376                                                                         &num_indexes);
1377         validIndexOid = RelationGetRelid(toastidxs[validIndex]);
1378
1379         /* Close the toast relation and all its indexes */
1380         toast_close_indexes(toastidxs, num_indexes, lock);
1381         heap_close(toastrel, lock);
1382
1383         return validIndexOid;
1384 }
1385
1386
1387 /* ----------
1388  * toast_save_datum -
1389  *
1390  *      Save one single datum into the secondary relation and return
1391  *      a Datum reference for it.
1392  *
1393  * rel: the main relation we're working with (not the toast rel!)
1394  * value: datum to be pushed to toast storage
1395  * oldexternal: if not NULL, toast pointer previously representing the datum
1396  * options: options to be passed to heap_insert() for toast rows
1397  * ----------
1398  */
1399 static Datum
1400 toast_save_datum(Relation rel, Datum value,
1401                                  struct varlena * oldexternal, int options)
1402 {
1403         Relation        toastrel;
1404         Relation   *toastidxs;
1405         HeapTuple       toasttup;
1406         TupleDesc       toasttupDesc;
1407         Datum           t_values[3];
1408         bool            t_isnull[3];
1409         CommandId       mycid = GetCurrentCommandId(true);
1410         struct varlena *result;
1411         struct varatt_external toast_pointer;
1412         union
1413         {
1414                 struct varlena hdr;
1415                 /* this is to make the union big enough for a chunk: */
1416                 char            data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ];
1417                 /* ensure union is aligned well enough: */
1418                 int32           align_it;
1419         }                       chunk_data;
1420         int32           chunk_size;
1421         int32           chunk_seq = 0;
1422         char       *data_p;
1423         int32           data_todo;
1424         Pointer         dval = DatumGetPointer(value);
1425         int                     num_indexes;
1426         int                     validIndex;
1427
1428         Assert(!VARATT_IS_EXTERNAL(value));
1429
1430         /*
1431          * Open the toast relation and its indexes.  We can use the index to check
1432          * uniqueness of the OID we assign to the toasted item, even though it has
1433          * additional columns besides OID.
1434          */
1435         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1436         toasttupDesc = toastrel->rd_att;
1437
1438         /* Open all the toast indexes and look for the valid one */
1439         validIndex = toast_open_indexes(toastrel,
1440                                                                         RowExclusiveLock,
1441                                                                         &toastidxs,
1442                                                                         &num_indexes);
1443
1444         /*
1445          * Get the data pointer and length, and compute va_rawsize and va_extsize.
1446          *
1447          * va_rawsize is the size of the equivalent fully uncompressed datum, so
1448          * we have to adjust for short headers.
1449          *
1450          * va_extsize is the actual size of the data payload in the toast records.
1451          */
1452         if (VARATT_IS_SHORT(dval))
1453         {
1454                 data_p = VARDATA_SHORT(dval);
1455                 data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1456                 toast_pointer.va_rawsize = data_todo + VARHDRSZ;                /* as if not short */
1457                 toast_pointer.va_extsize = data_todo;
1458         }
1459         else if (VARATT_IS_COMPRESSED(dval))
1460         {
1461                 data_p = VARDATA(dval);
1462                 data_todo = VARSIZE(dval) - VARHDRSZ;
1463                 /* rawsize in a compressed datum is just the size of the payload */
1464                 toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1465                 toast_pointer.va_extsize = data_todo;
1466                 /* Assert that the numbers look like it's compressed */
1467                 Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1468         }
1469         else
1470         {
1471                 data_p = VARDATA(dval);
1472                 data_todo = VARSIZE(dval) - VARHDRSZ;
1473                 toast_pointer.va_rawsize = VARSIZE(dval);
1474                 toast_pointer.va_extsize = data_todo;
1475         }
1476
1477         /*
1478          * Insert the correct table OID into the result TOAST pointer.
1479          *
1480          * Normally this is the actual OID of the target toast table, but during
1481          * table-rewriting operations such as CLUSTER, we have to insert the OID
1482          * of the table's real permanent toast table instead.  rd_toastoid is set
1483          * if we have to substitute such an OID.
1484          */
1485         if (OidIsValid(rel->rd_toastoid))
1486                 toast_pointer.va_toastrelid = rel->rd_toastoid;
1487         else
1488                 toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1489
1490         /*
1491          * Choose an OID to use as the value ID for this toast value.
1492          *
1493          * Normally we just choose an unused OID within the toast table.  But
1494          * during table-rewriting operations where we are preserving an existing
1495          * toast table OID, we want to preserve toast value OIDs too.  So, if
1496          * rd_toastoid is set and we had a prior external value from that same
1497          * toast table, re-use its value ID.  If we didn't have a prior external
1498          * value (which is a corner case, but possible if the table's attstorage
1499          * options have been changed), we have to pick a value ID that doesn't
1500          * conflict with either new or existing toast value OIDs.
1501          */
1502         if (!OidIsValid(rel->rd_toastoid))
1503         {
1504                 /* normal case: just choose an unused OID */
1505                 toast_pointer.va_valueid =
1506                         GetNewOidWithIndex(toastrel,
1507                                                            RelationGetRelid(toastidxs[validIndex]),
1508                                                            (AttrNumber) 1);
1509         }
1510         else
1511         {
1512                 /* rewrite case: check to see if value was in old toast table */
1513                 toast_pointer.va_valueid = InvalidOid;
1514                 if (oldexternal != NULL)
1515                 {
1516                         struct varatt_external old_toast_pointer;
1517
1518                         Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
1519                         /* Must copy to access aligned fields */
1520                         VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1521                         if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1522                         {
1523                                 /* This value came from the old toast table; reuse its OID */
1524                                 toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1525
1526                                 /*
1527                                  * There is a corner case here: the table rewrite might have
1528                                  * to copy both live and recently-dead versions of a row, and
1529                                  * those versions could easily reference the same toast value.
1530                                  * When we copy the second or later version of such a row,
1531                                  * reusing the OID will mean we select an OID that's already
1532                                  * in the new toast table.  Check for that, and if so, just
1533                                  * fall through without writing the data again.
1534                                  *
1535                                  * While annoying and ugly-looking, this is a good thing
1536                                  * because it ensures that we wind up with only one copy of
1537                                  * the toast value when there is only one copy in the old
1538                                  * toast table.  Before we detected this case, we'd have made
1539                                  * multiple copies, wasting space; and what's worse, the
1540                                  * copies belonging to already-deleted heap tuples would not
1541                                  * be reclaimed by VACUUM.
1542                                  */
1543                                 if (toastrel_valueid_exists(toastrel,
1544                                                                                         toast_pointer.va_valueid))
1545                                 {
1546                                         /* Match, so short-circuit the data storage loop below */
1547                                         data_todo = 0;
1548                                 }
1549                         }
1550                 }
1551                 if (toast_pointer.va_valueid == InvalidOid)
1552                 {
1553                         /*
1554                          * new value; must choose an OID that doesn't conflict in either
1555                          * old or new toast table
1556                          */
1557                         do
1558                         {
1559                                 toast_pointer.va_valueid =
1560                                         GetNewOidWithIndex(toastrel,
1561                                                                          RelationGetRelid(toastidxs[validIndex]),
1562                                                                            (AttrNumber) 1);
1563                         } while (toastid_valueid_exists(rel->rd_toastoid,
1564                                                                                         toast_pointer.va_valueid));
1565                 }
1566         }
1567
1568         /*
1569          * Initialize constant parts of the tuple data
1570          */
1571         t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1572         t_values[2] = PointerGetDatum(&chunk_data);
1573         t_isnull[0] = false;
1574         t_isnull[1] = false;
1575         t_isnull[2] = false;
1576
1577         /*
1578          * Split up the item into chunks
1579          */
1580         while (data_todo > 0)
1581         {
1582                 int                     i;
1583
1584                 CHECK_FOR_INTERRUPTS();
1585
1586                 /*
1587                  * Calculate the size of this chunk
1588                  */
1589                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1590
1591                 /*
1592                  * Build a tuple and store it
1593                  */
1594                 t_values[1] = Int32GetDatum(chunk_seq++);
1595                 SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1596                 memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1597                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1598
1599                 heap_insert(toastrel, toasttup, mycid, options, NULL);
1600
1601                 /*
1602                  * Create the index entry.  We cheat a little here by not using
1603                  * FormIndexDatum: this relies on the knowledge that the index columns
1604                  * are the same as the initial columns of the table for all the
1605                  * indexes.
1606                  *
1607                  * Note also that there had better not be any user-created index on
1608                  * the TOAST table, since we don't bother to update anything else.
1609                  */
1610                 for (i = 0; i < num_indexes; i++)
1611                 {
1612                         /* Only index relations marked as ready can be updated */
1613                         if (IndexIsReady(toastidxs[i]->rd_index))
1614                                 index_insert(toastidxs[i], t_values, t_isnull,
1615                                                          &(toasttup->t_self),
1616                                                          toastrel,
1617                                                          toastidxs[i]->rd_index->indisunique ?
1618                                                          UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
1619                 }
1620
1621                 /*
1622                  * Free memory
1623                  */
1624                 heap_freetuple(toasttup);
1625
1626                 /*
1627                  * Move on to next chunk
1628                  */
1629                 data_todo -= chunk_size;
1630                 data_p += chunk_size;
1631         }
1632
1633         /*
1634          * Done - close toast relation and its indexes
1635          */
1636         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1637         heap_close(toastrel, RowExclusiveLock);
1638
1639         /*
1640          * Create the TOAST pointer value that we'll return
1641          */
1642         result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1643         SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
1644         memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1645
1646         return PointerGetDatum(result);
1647 }
1648
1649
1650 /* ----------
1651  * toast_delete_datum -
1652  *
1653  *      Delete a single external stored value.
1654  * ----------
1655  */
1656 static void
1657 toast_delete_datum(Relation rel, Datum value)
1658 {
1659         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1660         struct varatt_external toast_pointer;
1661         Relation        toastrel;
1662         Relation   *toastidxs;
1663         ScanKeyData toastkey;
1664         SysScanDesc toastscan;
1665         HeapTuple       toasttup;
1666         int                     num_indexes;
1667         int                     validIndex;
1668
1669         if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1670                 return;
1671
1672         /* Must copy to access aligned fields */
1673         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1674
1675         /*
1676          * Open the toast relation and its indexes
1677          */
1678         toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1679
1680         /* Fetch valid relation used for process */
1681         validIndex = toast_open_indexes(toastrel,
1682                                                                         RowExclusiveLock,
1683                                                                         &toastidxs,
1684                                                                         &num_indexes);
1685
1686         /*
1687          * Setup a scan key to find chunks with matching va_valueid
1688          */
1689         ScanKeyInit(&toastkey,
1690                                 (AttrNumber) 1,
1691                                 BTEqualStrategyNumber, F_OIDEQ,
1692                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1693
1694         /*
1695          * Find all the chunks.  (We don't actually care whether we see them in
1696          * sequence or not, but since we've already locked the index we might as
1697          * well use systable_beginscan_ordered.)
1698          */
1699         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1700                                                                                    SnapshotToast, 1, &toastkey);
1701         while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1702         {
1703                 /*
1704                  * Have a chunk, delete it
1705                  */
1706                 simple_heap_delete(toastrel, &toasttup->t_self);
1707         }
1708
1709         /*
1710          * End scan and close relations
1711          */
1712         systable_endscan_ordered(toastscan);
1713         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1714         heap_close(toastrel, RowExclusiveLock);
1715 }
1716
1717
1718 /* ----------
1719  * toastrel_valueid_exists -
1720  *
1721  *      Test whether a toast value with the given ID exists in the toast relation
1722  * ----------
1723  */
1724 static bool
1725 toastrel_valueid_exists(Relation toastrel, Oid valueid)
1726 {
1727         bool            result = false;
1728         ScanKeyData toastkey;
1729         SysScanDesc toastscan;
1730         int                     num_indexes;
1731         int                     validIndex;
1732         Relation   *toastidxs;
1733
1734         /* Fetch a valid index relation */
1735         validIndex = toast_open_indexes(toastrel,
1736                                                                         RowExclusiveLock,
1737                                                                         &toastidxs,
1738                                                                         &num_indexes);
1739
1740         /*
1741          * Setup a scan key to find chunks with matching va_valueid
1742          */
1743         ScanKeyInit(&toastkey,
1744                                 (AttrNumber) 1,
1745                                 BTEqualStrategyNumber, F_OIDEQ,
1746                                 ObjectIdGetDatum(valueid));
1747
1748         /*
1749          * Is there any such chunk?
1750          */
1751         toastscan = systable_beginscan(toastrel,
1752                                                                    RelationGetRelid(toastidxs[validIndex]),
1753                                                                    true, SnapshotToast, 1, &toastkey);
1754
1755         if (systable_getnext(toastscan) != NULL)
1756                 result = true;
1757
1758         systable_endscan(toastscan);
1759
1760         /* Clean up */
1761         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1762
1763         return result;
1764 }
1765
1766 /* ----------
1767  * toastid_valueid_exists -
1768  *
1769  *      As above, but work from toast rel's OID not an open relation
1770  * ----------
1771  */
1772 static bool
1773 toastid_valueid_exists(Oid toastrelid, Oid valueid)
1774 {
1775         bool            result;
1776         Relation        toastrel;
1777
1778         toastrel = heap_open(toastrelid, AccessShareLock);
1779
1780         result = toastrel_valueid_exists(toastrel, valueid);
1781
1782         heap_close(toastrel, AccessShareLock);
1783
1784         return result;
1785 }
1786
1787
1788 /* ----------
1789  * toast_fetch_datum -
1790  *
1791  *      Reconstruct an in memory Datum from the chunks saved
1792  *      in the toast relation
1793  * ----------
1794  */
1795 static struct varlena *
1796 toast_fetch_datum(struct varlena * attr)
1797 {
1798         Relation        toastrel;
1799         Relation   *toastidxs;
1800         ScanKeyData toastkey;
1801         SysScanDesc toastscan;
1802         HeapTuple       ttup;
1803         TupleDesc       toasttupDesc;
1804         struct varlena *result;
1805         struct varatt_external toast_pointer;
1806         int32           ressize;
1807         int32           residx,
1808                                 nextidx;
1809         int32           numchunks;
1810         Pointer         chunk;
1811         bool            isnull;
1812         char       *chunkdata;
1813         int32           chunksize;
1814         int                     num_indexes;
1815         int                     validIndex;
1816
1817         if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1818                 elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
1819
1820         /* Must copy to access aligned fields */
1821         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1822
1823         ressize = toast_pointer.va_extsize;
1824         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1825
1826         result = (struct varlena *) palloc(ressize + VARHDRSZ);
1827
1828         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1829                 SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1830         else
1831                 SET_VARSIZE(result, ressize + VARHDRSZ);
1832
1833         /*
1834          * Open the toast relation and its indexes
1835          */
1836         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1837         toasttupDesc = toastrel->rd_att;
1838
1839         /* Look for the valid index of the toast relation */
1840         validIndex = toast_open_indexes(toastrel,
1841                                                                         AccessShareLock,
1842                                                                         &toastidxs,
1843                                                                         &num_indexes);
1844
1845         /*
1846          * Setup a scan key to fetch from the index by va_valueid
1847          */
1848         ScanKeyInit(&toastkey,
1849                                 (AttrNumber) 1,
1850                                 BTEqualStrategyNumber, F_OIDEQ,
1851                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1852
1853         /*
1854          * Read the chunks by index
1855          *
1856          * Note that because the index is actually on (valueid, chunkidx) we will
1857          * see the chunks in chunkidx order, even though we didn't explicitly ask
1858          * for it.
1859          */
1860         nextidx = 0;
1861
1862         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1863                                                                                    SnapshotToast, 1, &toastkey);
1864         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1865         {
1866                 /*
1867                  * Have a chunk, extract the sequence number and the data
1868                  */
1869                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1870                 Assert(!isnull);
1871                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1872                 Assert(!isnull);
1873                 if (!VARATT_IS_EXTENDED(chunk))
1874                 {
1875                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1876                         chunkdata = VARDATA(chunk);
1877                 }
1878                 else if (VARATT_IS_SHORT(chunk))
1879                 {
1880                         /* could happen due to heap_form_tuple doing its thing */
1881                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1882                         chunkdata = VARDATA_SHORT(chunk);
1883                 }
1884                 else
1885                 {
1886                         /* should never happen */
1887                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1888                                  toast_pointer.va_valueid,
1889                                  RelationGetRelationName(toastrel));
1890                         chunksize = 0;          /* keep compiler quiet */
1891                         chunkdata = NULL;
1892                 }
1893
1894                 /*
1895                  * Some checks on the data we've found
1896                  */
1897                 if (residx != nextidx)
1898                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1899                                  residx, nextidx,
1900                                  toast_pointer.va_valueid,
1901                                  RelationGetRelationName(toastrel));
1902                 if (residx < numchunks - 1)
1903                 {
1904                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1905                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1906                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1907                                          residx, numchunks,
1908                                          toast_pointer.va_valueid,
1909                                          RelationGetRelationName(toastrel));
1910                 }
1911                 else if (residx == numchunks - 1)
1912                 {
1913                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1914                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
1915                                          chunksize,
1916                                          (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1917                                          residx,
1918                                          toast_pointer.va_valueid,
1919                                          RelationGetRelationName(toastrel));
1920                 }
1921                 else
1922                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1923                                  residx,
1924                                  0, numchunks - 1,
1925                                  toast_pointer.va_valueid,
1926                                  RelationGetRelationName(toastrel));
1927
1928                 /*
1929                  * Copy the data into proper place in our result
1930                  */
1931                 memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1932                            chunkdata,
1933                            chunksize);
1934
1935                 nextidx++;
1936         }
1937
1938         /*
1939          * Final checks that we successfully fetched the datum
1940          */
1941         if (nextidx != numchunks)
1942                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
1943                          nextidx,
1944                          toast_pointer.va_valueid,
1945                          RelationGetRelationName(toastrel));
1946
1947         /*
1948          * End scan and close relations
1949          */
1950         systable_endscan_ordered(toastscan);
1951         toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
1952         heap_close(toastrel, AccessShareLock);
1953
1954         return result;
1955 }
1956
1957 /* ----------
1958  * toast_fetch_datum_slice -
1959  *
1960  *      Reconstruct a segment of a Datum from the chunks saved
1961  *      in the toast relation
1962  * ----------
1963  */
1964 static struct varlena *
1965 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1966 {
1967         Relation        toastrel;
1968         Relation   *toastidxs;
1969         ScanKeyData toastkey[3];
1970         int                     nscankeys;
1971         SysScanDesc toastscan;
1972         HeapTuple       ttup;
1973         TupleDesc       toasttupDesc;
1974         struct varlena *result;
1975         struct varatt_external toast_pointer;
1976         int32           attrsize;
1977         int32           residx;
1978         int32           nextidx;
1979         int                     numchunks;
1980         int                     startchunk;
1981         int                     endchunk;
1982         int32           startoffset;
1983         int32           endoffset;
1984         int                     totalchunks;
1985         Pointer         chunk;
1986         bool            isnull;
1987         char       *chunkdata;
1988         int32           chunksize;
1989         int32           chcpystrt;
1990         int32           chcpyend;
1991         int                     num_indexes;
1992         int                     validIndex;
1993
1994         if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1995                 elog(ERROR, "toast_fetch_datum_slice shouldn't be called for non-ondisk datums");
1996
1997         /* Must copy to access aligned fields */
1998         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1999
2000         /*
2001          * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
2002          * we can't return a compressed datum which is meaningful to toast later
2003          */
2004         Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
2005
2006         attrsize = toast_pointer.va_extsize;
2007         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
2008
2009         if (sliceoffset >= attrsize)
2010         {
2011                 sliceoffset = 0;
2012                 length = 0;
2013         }
2014
2015         if (((sliceoffset + length) > attrsize) || length < 0)
2016                 length = attrsize - sliceoffset;
2017
2018         result = (struct varlena *) palloc(length + VARHDRSZ);
2019
2020         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2021                 SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
2022         else
2023                 SET_VARSIZE(result, length + VARHDRSZ);
2024
2025         if (length == 0)
2026                 return result;                  /* Can save a lot of work at this point! */
2027
2028         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
2029         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
2030         numchunks = (endchunk - startchunk) + 1;
2031
2032         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
2033         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
2034
2035         /*
2036          * Open the toast relation and its indexes
2037          */
2038         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
2039         toasttupDesc = toastrel->rd_att;
2040
2041         /* Look for the valid index of toast relation */
2042         validIndex = toast_open_indexes(toastrel,
2043                                                                         AccessShareLock,
2044                                                                         &toastidxs,
2045                                                                         &num_indexes);
2046
2047         /*
2048          * Setup a scan key to fetch from the index. This is either two keys or
2049          * three depending on the number of chunks.
2050          */
2051         ScanKeyInit(&toastkey[0],
2052                                 (AttrNumber) 1,
2053                                 BTEqualStrategyNumber, F_OIDEQ,
2054                                 ObjectIdGetDatum(toast_pointer.va_valueid));
2055
2056         /*
2057          * Use equality condition for one chunk, a range condition otherwise:
2058          */
2059         if (numchunks == 1)
2060         {
2061                 ScanKeyInit(&toastkey[1],
2062                                         (AttrNumber) 2,
2063                                         BTEqualStrategyNumber, F_INT4EQ,
2064                                         Int32GetDatum(startchunk));
2065                 nscankeys = 2;
2066         }
2067         else
2068         {
2069                 ScanKeyInit(&toastkey[1],
2070                                         (AttrNumber) 2,
2071                                         BTGreaterEqualStrategyNumber, F_INT4GE,
2072                                         Int32GetDatum(startchunk));
2073                 ScanKeyInit(&toastkey[2],
2074                                         (AttrNumber) 2,
2075                                         BTLessEqualStrategyNumber, F_INT4LE,
2076                                         Int32GetDatum(endchunk));
2077                 nscankeys = 3;
2078         }
2079
2080         /*
2081          * Read the chunks by index
2082          *
2083          * The index is on (valueid, chunkidx) so they will come in order
2084          */
2085         nextidx = startchunk;
2086         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
2087                                                                                  SnapshotToast, nscankeys, toastkey);
2088         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
2089         {
2090                 /*
2091                  * Have a chunk, extract the sequence number and the data
2092                  */
2093                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
2094                 Assert(!isnull);
2095                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
2096                 Assert(!isnull);
2097                 if (!VARATT_IS_EXTENDED(chunk))
2098                 {
2099                         chunksize = VARSIZE(chunk) - VARHDRSZ;
2100                         chunkdata = VARDATA(chunk);
2101                 }
2102                 else if (VARATT_IS_SHORT(chunk))
2103                 {
2104                         /* could happen due to heap_form_tuple doing its thing */
2105                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2106                         chunkdata = VARDATA_SHORT(chunk);
2107                 }
2108                 else
2109                 {
2110                         /* should never happen */
2111                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
2112                                  toast_pointer.va_valueid,
2113                                  RelationGetRelationName(toastrel));
2114                         chunksize = 0;          /* keep compiler quiet */
2115                         chunkdata = NULL;
2116                 }
2117
2118                 /*
2119                  * Some checks on the data we've found
2120                  */
2121                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
2122                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
2123                                  residx, nextidx,
2124                                  toast_pointer.va_valueid,
2125                                  RelationGetRelationName(toastrel));
2126                 if (residx < totalchunks - 1)
2127                 {
2128                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
2129                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
2130                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
2131                                          residx, totalchunks,
2132                                          toast_pointer.va_valueid,
2133                                          RelationGetRelationName(toastrel));
2134                 }
2135                 else if (residx == totalchunks - 1)
2136                 {
2137                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
2138                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
2139                                          chunksize,
2140                                          (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
2141                                          residx,
2142                                          toast_pointer.va_valueid,
2143                                          RelationGetRelationName(toastrel));
2144                 }
2145                 else
2146                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2147                                  residx,
2148                                  0, totalchunks - 1,
2149                                  toast_pointer.va_valueid,
2150                                  RelationGetRelationName(toastrel));
2151
2152                 /*
2153                  * Copy the data into proper place in our result
2154                  */
2155                 chcpystrt = 0;
2156                 chcpyend = chunksize - 1;
2157                 if (residx == startchunk)
2158                         chcpystrt = startoffset;
2159                 if (residx == endchunk)
2160                         chcpyend = endoffset;
2161
2162                 memcpy(VARDATA(result) +
2163                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
2164                            chunkdata + chcpystrt,
2165                            (chcpyend - chcpystrt) + 1);
2166
2167                 nextidx++;
2168         }
2169
2170         /*
2171          * Final checks that we successfully fetched the datum
2172          */
2173         if (nextidx != (endchunk + 1))
2174                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
2175                          nextidx,
2176                          toast_pointer.va_valueid,
2177                          RelationGetRelationName(toastrel));
2178
2179         /*
2180          * End scan and close relations
2181          */
2182         systable_endscan_ordered(toastscan);
2183         toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2184         heap_close(toastrel, AccessShareLock);
2185
2186         return result;
2187 }
2188
2189 /* ----------
2190  * toast_decompress_datum -
2191  *
2192  * Decompress a compressed version of a varlena datum
2193  */
2194 static struct varlena *
2195 toast_decompress_datum(struct varlena * attr)
2196 {
2197         struct varlena *result;
2198
2199         Assert(VARATT_IS_COMPRESSED(attr));
2200
2201         result = (struct varlena *)
2202                 palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2203         SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2204
2205         if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
2206                                                 VARSIZE(attr) - TOAST_COMPRESS_HDRSZ,
2207                                                 VARDATA(result),
2208                                                 TOAST_COMPRESS_RAWSIZE(attr)) < 0)
2209                 elog(ERROR, "compressed data is corrupted");
2210
2211         return result;
2212 }
2213
2214
2215 /* ----------
2216  * toast_open_indexes
2217  *
2218  *      Get an array of the indexes associated to the given toast relation
2219  *      and return as well the position of the valid index used by the toast
2220  *      relation in this array. It is the responsibility of the caller of this
2221  *      function to close the indexes as well as free them.
2222  */
2223 static int
2224 toast_open_indexes(Relation toastrel,
2225                                    LOCKMODE lock,
2226                                    Relation **toastidxs,
2227                                    int *num_indexes)
2228 {
2229         int                     i = 0;
2230         int                     res = 0;
2231         bool            found = false;
2232         List       *indexlist;
2233         ListCell   *lc;
2234
2235         /* Get index list of the toast relation */
2236         indexlist = RelationGetIndexList(toastrel);
2237         Assert(indexlist != NIL);
2238
2239         *num_indexes = list_length(indexlist);
2240
2241         /* Open all the index relations */
2242         *toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation));
2243         foreach(lc, indexlist)
2244                 (*toastidxs)[i++] = index_open(lfirst_oid(lc), lock);
2245
2246         /* Fetch the first valid index in list */
2247         for (i = 0; i < *num_indexes; i++)
2248         {
2249                 Relation        toastidx = (*toastidxs)[i];
2250
2251                 if (toastidx->rd_index->indisvalid)
2252                 {
2253                         res = i;
2254                         found = true;
2255                         break;
2256                 }
2257         }
2258
2259         /*
2260          * Free index list, not necessary anymore as relations are opened and a
2261          * valid index has been found.
2262          */
2263         list_free(indexlist);
2264
2265         /*
2266          * The toast relation should have one valid index, so something is going
2267          * wrong if there is nothing.
2268          */
2269         if (!found)
2270                 elog(ERROR, "no valid index found for toast relation with Oid %u",
2271                          RelationGetRelid(toastrel));
2272
2273         return res;
2274 }
2275
2276 /* ----------
2277  * toast_close_indexes
2278  *
2279  *      Close an array of indexes for a toast relation and free it. This should
2280  *      be called for a set of indexes opened previously with toast_open_indexes.
2281  */
2282 static void
2283 toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock)
2284 {
2285         int                     i;
2286
2287         /* Close relations and clean up things */
2288         for (i = 0; i < num_indexes; i++)
2289                 index_close(toastidxs[i], lock);
2290         pfree(toastidxs);
2291 }