]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Allow index AMs to cache data across aminsert calls within a SQL command.
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2017, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "common/pg_lzcompress.h"
39 #include "miscadmin.h"
40 #include "utils/expandeddatum.h"
41 #include "utils/fmgroids.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/typcache.h"
45 #include "utils/tqual.h"
46
47
48 #undef TOAST_DEBUG
49
50 /*
51  *      The information at the start of the compressed toast data.
52  */
53 typedef struct toast_compress_header
54 {
55         int32           vl_len_;                /* varlena header (do not touch directly!) */
56         int32           rawsize;
57 } toast_compress_header;
58
59 /*
60  * Utilities for manipulation of header information for compressed
61  * toast entries.
62  */
63 #define TOAST_COMPRESS_HDRSZ            ((int32) sizeof(toast_compress_header))
64 #define TOAST_COMPRESS_RAWSIZE(ptr) (((toast_compress_header *) (ptr))->rawsize)
65 #define TOAST_COMPRESS_RAWDATA(ptr) \
66         (((char *) (ptr)) + TOAST_COMPRESS_HDRSZ)
67 #define TOAST_COMPRESS_SET_RAWSIZE(ptr, len) \
68         (((toast_compress_header *) (ptr))->rawsize = (len))
69
70 static void toast_delete_datum(Relation rel, Datum value, bool is_speculative);
71 static Datum toast_save_datum(Relation rel, Datum value,
72                                  struct varlena * oldexternal, int options);
73 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
74 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
75 static struct varlena *toast_fetch_datum(struct varlena * attr);
76 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
77                                                 int32 sliceoffset, int32 length);
78 static struct varlena *toast_decompress_datum(struct varlena * attr);
79 static int toast_open_indexes(Relation toastrel,
80                                    LOCKMODE lock,
81                                    Relation **toastidxs,
82                                    int *num_indexes);
83 static void toast_close_indexes(Relation *toastidxs, int num_indexes,
84                                         LOCKMODE lock);
85 static void init_toast_snapshot(Snapshot toast_snapshot);
86
87
88 /* ----------
89  * heap_tuple_fetch_attr -
90  *
91  *      Public entry point to get back a toasted value from
92  *      external source (possibly still in compressed format).
93  *
94  * This will return a datum that contains all the data internally, ie, not
95  * relying on external storage or memory, but it can still be compressed or
96  * have a short header.  Note some callers assume that if the input is an
97  * EXTERNAL datum, the result will be a pfree'able chunk.
98  * ----------
99  */
100 struct varlena *
101 heap_tuple_fetch_attr(struct varlena * attr)
102 {
103         struct varlena *result;
104
105         if (VARATT_IS_EXTERNAL_ONDISK(attr))
106         {
107                 /*
108                  * This is an external stored plain value
109                  */
110                 result = toast_fetch_datum(attr);
111         }
112         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
113         {
114                 /*
115                  * This is an indirect pointer --- dereference it
116                  */
117                 struct varatt_indirect redirect;
118
119                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
120                 attr = (struct varlena *) redirect.pointer;
121
122                 /* nested indirect Datums aren't allowed */
123                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
124
125                 /* recurse if value is still external in some other way */
126                 if (VARATT_IS_EXTERNAL(attr))
127                         return heap_tuple_fetch_attr(attr);
128
129                 /*
130                  * Copy into the caller's memory context, in case caller tries to
131                  * pfree the result.
132                  */
133                 result = (struct varlena *) palloc(VARSIZE_ANY(attr));
134                 memcpy(result, attr, VARSIZE_ANY(attr));
135         }
136         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
137         {
138                 /*
139                  * This is an expanded-object pointer --- get flat format
140                  */
141                 ExpandedObjectHeader *eoh;
142                 Size            resultsize;
143
144                 eoh = DatumGetEOHP(PointerGetDatum(attr));
145                 resultsize = EOH_get_flat_size(eoh);
146                 result = (struct varlena *) palloc(resultsize);
147                 EOH_flatten_into(eoh, (void *) result, resultsize);
148         }
149         else
150         {
151                 /*
152                  * This is a plain value inside of the main tuple - why am I called?
153                  */
154                 result = attr;
155         }
156
157         return result;
158 }
159
160
161 /* ----------
162  * heap_tuple_untoast_attr -
163  *
164  *      Public entry point to get back a toasted value from compression
165  *      or external storage.  The result is always non-extended varlena form.
166  *
167  * Note some callers assume that if the input is an EXTERNAL or COMPRESSED
168  * datum, the result will be a pfree'able chunk.
169  * ----------
170  */
171 struct varlena *
172 heap_tuple_untoast_attr(struct varlena * attr)
173 {
174         if (VARATT_IS_EXTERNAL_ONDISK(attr))
175         {
176                 /*
177                  * This is an externally stored datum --- fetch it back from there
178                  */
179                 attr = toast_fetch_datum(attr);
180                 /* If it's compressed, decompress it */
181                 if (VARATT_IS_COMPRESSED(attr))
182                 {
183                         struct varlena *tmp = attr;
184
185                         attr = toast_decompress_datum(tmp);
186                         pfree(tmp);
187                 }
188         }
189         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
190         {
191                 /*
192                  * This is an indirect pointer --- dereference it
193                  */
194                 struct varatt_indirect redirect;
195
196                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
197                 attr = (struct varlena *) redirect.pointer;
198
199                 /* nested indirect Datums aren't allowed */
200                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
201
202                 /* recurse in case value is still extended in some other way */
203                 attr = heap_tuple_untoast_attr(attr);
204
205                 /* if it isn't, we'd better copy it */
206                 if (attr == (struct varlena *) redirect.pointer)
207                 {
208                         struct varlena *result;
209
210                         result = (struct varlena *) palloc(VARSIZE_ANY(attr));
211                         memcpy(result, attr, VARSIZE_ANY(attr));
212                         attr = result;
213                 }
214         }
215         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
216         {
217                 /*
218                  * This is an expanded-object pointer --- get flat format
219                  */
220                 attr = heap_tuple_fetch_attr(attr);
221                 /* flatteners are not allowed to produce compressed/short output */
222                 Assert(!VARATT_IS_EXTENDED(attr));
223         }
224         else if (VARATT_IS_COMPRESSED(attr))
225         {
226                 /*
227                  * This is a compressed value inside of the main tuple
228                  */
229                 attr = toast_decompress_datum(attr);
230         }
231         else if (VARATT_IS_SHORT(attr))
232         {
233                 /*
234                  * This is a short-header varlena --- convert to 4-byte header format
235                  */
236                 Size            data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
237                 Size            new_size = data_size + VARHDRSZ;
238                 struct varlena *new_attr;
239
240                 new_attr = (struct varlena *) palloc(new_size);
241                 SET_VARSIZE(new_attr, new_size);
242                 memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
243                 attr = new_attr;
244         }
245
246         return attr;
247 }
248
249
250 /* ----------
251  * heap_tuple_untoast_attr_slice -
252  *
253  *              Public entry point to get back part of a toasted value
254  *              from compression or external storage.
255  * ----------
256  */
257 struct varlena *
258 heap_tuple_untoast_attr_slice(struct varlena * attr,
259                                                           int32 sliceoffset, int32 slicelength)
260 {
261         struct varlena *preslice;
262         struct varlena *result;
263         char       *attrdata;
264         int32           attrsize;
265
266         if (VARATT_IS_EXTERNAL_ONDISK(attr))
267         {
268                 struct varatt_external toast_pointer;
269
270                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
271
272                 /* fast path for non-compressed external datums */
273                 if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
274                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
275
276                 /* fetch it back (compressed marker will get set automatically) */
277                 preslice = toast_fetch_datum(attr);
278         }
279         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
280         {
281                 struct varatt_indirect redirect;
282
283                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
284
285                 /* nested indirect Datums aren't allowed */
286                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
287
288                 return heap_tuple_untoast_attr_slice(redirect.pointer,
289                                                                                          sliceoffset, slicelength);
290         }
291         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
292         {
293                 /* pass it off to heap_tuple_fetch_attr to flatten */
294                 preslice = heap_tuple_fetch_attr(attr);
295         }
296         else
297                 preslice = attr;
298
299         Assert(!VARATT_IS_EXTERNAL(preslice));
300
301         if (VARATT_IS_COMPRESSED(preslice))
302         {
303                 struct varlena *tmp = preslice;
304
305                 preslice = toast_decompress_datum(tmp);
306
307                 if (tmp != attr)
308                         pfree(tmp);
309         }
310
311         if (VARATT_IS_SHORT(preslice))
312         {
313                 attrdata = VARDATA_SHORT(preslice);
314                 attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
315         }
316         else
317         {
318                 attrdata = VARDATA(preslice);
319                 attrsize = VARSIZE(preslice) - VARHDRSZ;
320         }
321
322         /* slicing of datum for compressed cases and plain value */
323
324         if (sliceoffset >= attrsize)
325         {
326                 sliceoffset = 0;
327                 slicelength = 0;
328         }
329
330         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
331                 slicelength = attrsize - sliceoffset;
332
333         result = (struct varlena *) palloc(slicelength + VARHDRSZ);
334         SET_VARSIZE(result, slicelength + VARHDRSZ);
335
336         memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
337
338         if (preslice != attr)
339                 pfree(preslice);
340
341         return result;
342 }
343
344
345 /* ----------
346  * toast_raw_datum_size -
347  *
348  *      Return the raw (detoasted) size of a varlena datum
349  *      (including the VARHDRSZ header)
350  * ----------
351  */
352 Size
353 toast_raw_datum_size(Datum value)
354 {
355         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
356         Size            result;
357
358         if (VARATT_IS_EXTERNAL_ONDISK(attr))
359         {
360                 /* va_rawsize is the size of the original datum -- including header */
361                 struct varatt_external toast_pointer;
362
363                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
364                 result = toast_pointer.va_rawsize;
365         }
366         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
367         {
368                 struct varatt_indirect toast_pointer;
369
370                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
371
372                 /* nested indirect Datums aren't allowed */
373                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
374
375                 return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
376         }
377         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
378         {
379                 result = EOH_get_flat_size(DatumGetEOHP(value));
380         }
381         else if (VARATT_IS_COMPRESSED(attr))
382         {
383                 /* here, va_rawsize is just the payload size */
384                 result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
385         }
386         else if (VARATT_IS_SHORT(attr))
387         {
388                 /*
389                  * we have to normalize the header length to VARHDRSZ or else the
390                  * callers of this function will be confused.
391                  */
392                 result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
393         }
394         else
395         {
396                 /* plain untoasted datum */
397                 result = VARSIZE(attr);
398         }
399         return result;
400 }
401
402 /* ----------
403  * toast_datum_size
404  *
405  *      Return the physical storage size (possibly compressed) of a varlena datum
406  * ----------
407  */
408 Size
409 toast_datum_size(Datum value)
410 {
411         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
412         Size            result;
413
414         if (VARATT_IS_EXTERNAL_ONDISK(attr))
415         {
416                 /*
417                  * Attribute is stored externally - return the extsize whether
418                  * compressed or not.  We do not count the size of the toast pointer
419                  * ... should we?
420                  */
421                 struct varatt_external toast_pointer;
422
423                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
424                 result = toast_pointer.va_extsize;
425         }
426         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
427         {
428                 struct varatt_indirect toast_pointer;
429
430                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
431
432                 /* nested indirect Datums aren't allowed */
433                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
434
435                 return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
436         }
437         else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
438         {
439                 result = EOH_get_flat_size(DatumGetEOHP(value));
440         }
441         else if (VARATT_IS_SHORT(attr))
442         {
443                 result = VARSIZE_SHORT(attr);
444         }
445         else
446         {
447                 /*
448                  * Attribute is stored inline either compressed or not, just calculate
449                  * the size of the datum in either case.
450                  */
451                 result = VARSIZE(attr);
452         }
453         return result;
454 }
455
456
457 /* ----------
458  * toast_delete -
459  *
460  *      Cascaded delete toast-entries on DELETE
461  * ----------
462  */
463 void
464 toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
465 {
466         TupleDesc       tupleDesc;
467         Form_pg_attribute *att;
468         int                     numAttrs;
469         int                     i;
470         Datum           toast_values[MaxHeapAttributeNumber];
471         bool            toast_isnull[MaxHeapAttributeNumber];
472
473         /*
474          * We should only ever be called for tuples of plain relations or
475          * materialized views --- recursing on a toast rel is bad news.
476          */
477         Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
478                    rel->rd_rel->relkind == RELKIND_MATVIEW);
479
480         /*
481          * Get the tuple descriptor and break down the tuple into fields.
482          *
483          * NOTE: it's debatable whether to use heap_deform_tuple() here or just
484          * heap_getattr() only the varlena columns.  The latter could win if there
485          * are few varlena columns and many non-varlena ones. However,
486          * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
487          * O(N^2) if there are many varlena columns, so it seems better to err on
488          * the side of linear cost.  (We won't even be here unless there's at
489          * least one varlena column, by the way.)
490          */
491         tupleDesc = rel->rd_att;
492         att = tupleDesc->attrs;
493         numAttrs = tupleDesc->natts;
494
495         Assert(numAttrs <= MaxHeapAttributeNumber);
496         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
497
498         /*
499          * Check for external stored attributes and delete them from the secondary
500          * relation.
501          */
502         for (i = 0; i < numAttrs; i++)
503         {
504                 if (att[i]->attlen == -1)
505                 {
506                         Datum           value = toast_values[i];
507
508                         if (toast_isnull[i])
509                                 continue;
510                         else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
511                                 toast_delete_datum(rel, value, is_speculative);
512                 }
513         }
514 }
515
516
517 /* ----------
518  * toast_insert_or_update -
519  *
520  *      Delete no-longer-used toast-entries and create new ones to
521  *      make the new tuple fit on INSERT or UPDATE
522  *
523  * Inputs:
524  *      newtup: the candidate new tuple to be inserted
525  *      oldtup: the old row version for UPDATE, or NULL for INSERT
526  *      options: options to be passed to heap_insert() for toast rows
527  * Result:
528  *      either newtup if no toasting is needed, or a palloc'd modified tuple
529  *      that is what should actually get stored
530  *
531  * NOTE: neither newtup nor oldtup will be modified.  This is a change
532  * from the pre-8.1 API of this routine.
533  * ----------
534  */
535 HeapTuple
536 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
537                                            int options)
538 {
539         HeapTuple       result_tuple;
540         TupleDesc       tupleDesc;
541         Form_pg_attribute *att;
542         int                     numAttrs;
543         int                     i;
544
545         bool            need_change = false;
546         bool            need_free = false;
547         bool            need_delold = false;
548         bool            has_nulls = false;
549
550         Size            maxDataLen;
551         Size            hoff;
552
553         char            toast_action[MaxHeapAttributeNumber];
554         bool            toast_isnull[MaxHeapAttributeNumber];
555         bool            toast_oldisnull[MaxHeapAttributeNumber];
556         Datum           toast_values[MaxHeapAttributeNumber];
557         Datum           toast_oldvalues[MaxHeapAttributeNumber];
558         struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
559         int32           toast_sizes[MaxHeapAttributeNumber];
560         bool            toast_free[MaxHeapAttributeNumber];
561         bool            toast_delold[MaxHeapAttributeNumber];
562
563         /*
564          * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super
565          * deletions just normally insert/delete the toast values. It seems
566          * easiest to deal with that here, instead on, potentially, multiple
567          * callers.
568          */
569         options &= ~HEAP_INSERT_SPECULATIVE;
570
571         /*
572          * We should only ever be called for tuples of plain relations or
573          * materialized views --- recursing on a toast rel is bad news.
574          */
575         Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
576                    rel->rd_rel->relkind == RELKIND_MATVIEW);
577
578         /*
579          * Get the tuple descriptor and break down the tuple(s) into fields.
580          */
581         tupleDesc = rel->rd_att;
582         att = tupleDesc->attrs;
583         numAttrs = tupleDesc->natts;
584
585         Assert(numAttrs <= MaxHeapAttributeNumber);
586         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
587         if (oldtup != NULL)
588                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
589
590         /* ----------
591          * Then collect information about the values given
592          *
593          * NOTE: toast_action[i] can have these values:
594          *              ' '             default handling
595          *              'p'             already processed --- don't touch it
596          *              'x'             incompressible, but OK to move off
597          *
598          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
599          *              toast_action[i] different from 'p'.
600          * ----------
601          */
602         memset(toast_action, ' ', numAttrs * sizeof(char));
603         memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
604         memset(toast_free, 0, numAttrs * sizeof(bool));
605         memset(toast_delold, 0, numAttrs * sizeof(bool));
606
607         for (i = 0; i < numAttrs; i++)
608         {
609                 struct varlena *old_value;
610                 struct varlena *new_value;
611
612                 if (oldtup != NULL)
613                 {
614                         /*
615                          * For UPDATE get the old and new values of this attribute
616                          */
617                         old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
618                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
619
620                         /*
621                          * If the old value is stored on disk, check if it has changed so
622                          * we have to delete it later.
623                          */
624                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
625                                 VARATT_IS_EXTERNAL_ONDISK(old_value))
626                         {
627                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
628                                         memcmp((char *) old_value, (char *) new_value,
629                                                    VARSIZE_EXTERNAL(old_value)) != 0)
630                                 {
631                                         /*
632                                          * The old external stored value isn't needed any more
633                                          * after the update
634                                          */
635                                         toast_delold[i] = true;
636                                         need_delold = true;
637                                 }
638                                 else
639                                 {
640                                         /*
641                                          * This attribute isn't changed by this update so we reuse
642                                          * the original reference to the old value in the new
643                                          * tuple.
644                                          */
645                                         toast_action[i] = 'p';
646                                         continue;
647                                 }
648                         }
649                 }
650                 else
651                 {
652                         /*
653                          * For INSERT simply get the new value
654                          */
655                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
656                 }
657
658                 /*
659                  * Handle NULL attributes
660                  */
661                 if (toast_isnull[i])
662                 {
663                         toast_action[i] = 'p';
664                         has_nulls = true;
665                         continue;
666                 }
667
668                 /*
669                  * Now look at varlena attributes
670                  */
671                 if (att[i]->attlen == -1)
672                 {
673                         /*
674                          * If the table's attribute says PLAIN always, force it so.
675                          */
676                         if (att[i]->attstorage == 'p')
677                                 toast_action[i] = 'p';
678
679                         /*
680                          * We took care of UPDATE above, so any external value we find
681                          * still in the tuple must be someone else's that we cannot reuse
682                          * (this includes the case of an out-of-line in-memory datum).
683                          * Fetch it back (without decompression, unless we are forcing
684                          * PLAIN storage).  If necessary, we'll push it out as a new
685                          * external value below.
686                          */
687                         if (VARATT_IS_EXTERNAL(new_value))
688                         {
689                                 toast_oldexternal[i] = new_value;
690                                 if (att[i]->attstorage == 'p')
691                                         new_value = heap_tuple_untoast_attr(new_value);
692                                 else
693                                         new_value = heap_tuple_fetch_attr(new_value);
694                                 toast_values[i] = PointerGetDatum(new_value);
695                                 toast_free[i] = true;
696                                 need_change = true;
697                                 need_free = true;
698                         }
699
700                         /*
701                          * Remember the size of this attribute
702                          */
703                         toast_sizes[i] = VARSIZE_ANY(new_value);
704                 }
705                 else
706                 {
707                         /*
708                          * Not a varlena attribute, plain storage always
709                          */
710                         toast_action[i] = 'p';
711                 }
712         }
713
714         /* ----------
715          * Compress and/or save external until data fits into target length
716          *
717          *      1: Inline compress attributes with attstorage 'x', and store very
718          *         large attributes with attstorage 'x' or 'e' external immediately
719          *      2: Store attributes with attstorage 'x' or 'e' external
720          *      3: Inline compress attributes with attstorage 'm'
721          *      4: Store attributes with attstorage 'm' external
722          * ----------
723          */
724
725         /* compute header overhead --- this should match heap_form_tuple() */
726         hoff = SizeofHeapTupleHeader;
727         if (has_nulls)
728                 hoff += BITMAPLEN(numAttrs);
729         if (newtup->t_data->t_infomask & HEAP_HASOID)
730                 hoff += sizeof(Oid);
731         hoff = MAXALIGN(hoff);
732         /* now convert to a limit on the tuple data size */
733         maxDataLen = TOAST_TUPLE_TARGET - hoff;
734
735         /*
736          * Look for attributes with attstorage 'x' to compress.  Also find large
737          * attributes with attstorage 'x' or 'e', and store them external.
738          */
739         while (heap_compute_data_size(tupleDesc,
740                                                                   toast_values, toast_isnull) > maxDataLen)
741         {
742                 int                     biggest_attno = -1;
743                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
744                 Datum           old_value;
745                 Datum           new_value;
746
747                 /*
748                  * Search for the biggest yet unprocessed internal attribute
749                  */
750                 for (i = 0; i < numAttrs; i++)
751                 {
752                         if (toast_action[i] != ' ')
753                                 continue;
754                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
755                                 continue;               /* can't happen, toast_action would be 'p' */
756                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
757                                 continue;
758                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
759                                 continue;
760                         if (toast_sizes[i] > biggest_size)
761                         {
762                                 biggest_attno = i;
763                                 biggest_size = toast_sizes[i];
764                         }
765                 }
766
767                 if (biggest_attno < 0)
768                         break;
769
770                 /*
771                  * Attempt to compress it inline, if it has attstorage 'x'
772                  */
773                 i = biggest_attno;
774                 if (att[i]->attstorage == 'x')
775                 {
776                         old_value = toast_values[i];
777                         new_value = toast_compress_datum(old_value);
778
779                         if (DatumGetPointer(new_value) != NULL)
780                         {
781                                 /* successful compression */
782                                 if (toast_free[i])
783                                         pfree(DatumGetPointer(old_value));
784                                 toast_values[i] = new_value;
785                                 toast_free[i] = true;
786                                 toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
787                                 need_change = true;
788                                 need_free = true;
789                         }
790                         else
791                         {
792                                 /* incompressible, ignore on subsequent compression passes */
793                                 toast_action[i] = 'x';
794                         }
795                 }
796                 else
797                 {
798                         /* has attstorage 'e', ignore on subsequent compression passes */
799                         toast_action[i] = 'x';
800                 }
801
802                 /*
803                  * If this value is by itself more than maxDataLen (after compression
804                  * if any), push it out to the toast table immediately, if possible.
805                  * This avoids uselessly compressing other fields in the common case
806                  * where we have one long field and several short ones.
807                  *
808                  * XXX maybe the threshold should be less than maxDataLen?
809                  */
810                 if (toast_sizes[i] > maxDataLen &&
811                         rel->rd_rel->reltoastrelid != InvalidOid)
812                 {
813                         old_value = toast_values[i];
814                         toast_action[i] = 'p';
815                         toast_values[i] = toast_save_datum(rel, toast_values[i],
816                                                                                            toast_oldexternal[i], options);
817                         if (toast_free[i])
818                                 pfree(DatumGetPointer(old_value));
819                         toast_free[i] = true;
820                         need_change = true;
821                         need_free = true;
822                 }
823         }
824
825         /*
826          * Second we look for attributes of attstorage 'x' or 'e' that are still
827          * inline.  But skip this if there's no toast table to push them to.
828          */
829         while (heap_compute_data_size(tupleDesc,
830                                                                   toast_values, toast_isnull) > maxDataLen &&
831                    rel->rd_rel->reltoastrelid != InvalidOid)
832         {
833                 int                     biggest_attno = -1;
834                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
835                 Datum           old_value;
836
837                 /*------
838                  * Search for the biggest yet inlined attribute with
839                  * attstorage equals 'x' or 'e'
840                  *------
841                  */
842                 for (i = 0; i < numAttrs; i++)
843                 {
844                         if (toast_action[i] == 'p')
845                                 continue;
846                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
847                                 continue;               /* can't happen, toast_action would be 'p' */
848                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
849                                 continue;
850                         if (toast_sizes[i] > biggest_size)
851                         {
852                                 biggest_attno = i;
853                                 biggest_size = toast_sizes[i];
854                         }
855                 }
856
857                 if (biggest_attno < 0)
858                         break;
859
860                 /*
861                  * Store this external
862                  */
863                 i = biggest_attno;
864                 old_value = toast_values[i];
865                 toast_action[i] = 'p';
866                 toast_values[i] = toast_save_datum(rel, toast_values[i],
867                                                                                    toast_oldexternal[i], options);
868                 if (toast_free[i])
869                         pfree(DatumGetPointer(old_value));
870                 toast_free[i] = true;
871
872                 need_change = true;
873                 need_free = true;
874         }
875
876         /*
877          * Round 3 - this time we take attributes with storage 'm' into
878          * compression
879          */
880         while (heap_compute_data_size(tupleDesc,
881                                                                   toast_values, toast_isnull) > maxDataLen)
882         {
883                 int                     biggest_attno = -1;
884                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
885                 Datum           old_value;
886                 Datum           new_value;
887
888                 /*
889                  * Search for the biggest yet uncompressed internal attribute
890                  */
891                 for (i = 0; i < numAttrs; i++)
892                 {
893                         if (toast_action[i] != ' ')
894                                 continue;
895                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
896                                 continue;               /* can't happen, toast_action would be 'p' */
897                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
898                                 continue;
899                         if (att[i]->attstorage != 'm')
900                                 continue;
901                         if (toast_sizes[i] > biggest_size)
902                         {
903                                 biggest_attno = i;
904                                 biggest_size = toast_sizes[i];
905                         }
906                 }
907
908                 if (biggest_attno < 0)
909                         break;
910
911                 /*
912                  * Attempt to compress it inline
913                  */
914                 i = biggest_attno;
915                 old_value = toast_values[i];
916                 new_value = toast_compress_datum(old_value);
917
918                 if (DatumGetPointer(new_value) != NULL)
919                 {
920                         /* successful compression */
921                         if (toast_free[i])
922                                 pfree(DatumGetPointer(old_value));
923                         toast_values[i] = new_value;
924                         toast_free[i] = true;
925                         toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
926                         need_change = true;
927                         need_free = true;
928                 }
929                 else
930                 {
931                         /* incompressible, ignore on subsequent compression passes */
932                         toast_action[i] = 'x';
933                 }
934         }
935
936         /*
937          * Finally we store attributes of type 'm' externally.  At this point we
938          * increase the target tuple size, so that 'm' attributes aren't stored
939          * externally unless really necessary.
940          */
941         maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
942
943         while (heap_compute_data_size(tupleDesc,
944                                                                   toast_values, toast_isnull) > maxDataLen &&
945                    rel->rd_rel->reltoastrelid != InvalidOid)
946         {
947                 int                     biggest_attno = -1;
948                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
949                 Datum           old_value;
950
951                 /*--------
952                  * Search for the biggest yet inlined attribute with
953                  * attstorage = 'm'
954                  *--------
955                  */
956                 for (i = 0; i < numAttrs; i++)
957                 {
958                         if (toast_action[i] == 'p')
959                                 continue;
960                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
961                                 continue;               /* can't happen, toast_action would be 'p' */
962                         if (att[i]->attstorage != 'm')
963                                 continue;
964                         if (toast_sizes[i] > biggest_size)
965                         {
966                                 biggest_attno = i;
967                                 biggest_size = toast_sizes[i];
968                         }
969                 }
970
971                 if (biggest_attno < 0)
972                         break;
973
974                 /*
975                  * Store this external
976                  */
977                 i = biggest_attno;
978                 old_value = toast_values[i];
979                 toast_action[i] = 'p';
980                 toast_values[i] = toast_save_datum(rel, toast_values[i],
981                                                                                    toast_oldexternal[i], options);
982                 if (toast_free[i])
983                         pfree(DatumGetPointer(old_value));
984                 toast_free[i] = true;
985
986                 need_change = true;
987                 need_free = true;
988         }
989
990         /*
991          * In the case we toasted any values, we need to build a new heap tuple
992          * with the changed values.
993          */
994         if (need_change)
995         {
996                 HeapTupleHeader olddata = newtup->t_data;
997                 HeapTupleHeader new_data;
998                 int32           new_header_len;
999                 int32           new_data_len;
1000                 int32           new_tuple_len;
1001
1002                 /*
1003                  * Calculate the new size of the tuple.
1004                  *
1005                  * Note: we used to assume here that the old tuple's t_hoff must equal
1006                  * the new_header_len value, but that was incorrect.  The old tuple
1007                  * might have a smaller-than-current natts, if there's been an ALTER
1008                  * TABLE ADD COLUMN since it was stored; and that would lead to a
1009                  * different conclusion about the size of the null bitmap, or even
1010                  * whether there needs to be one at all.
1011                  */
1012                 new_header_len = SizeofHeapTupleHeader;
1013                 if (has_nulls)
1014                         new_header_len += BITMAPLEN(numAttrs);
1015                 if (olddata->t_infomask & HEAP_HASOID)
1016                         new_header_len += sizeof(Oid);
1017                 new_header_len = MAXALIGN(new_header_len);
1018                 new_data_len = heap_compute_data_size(tupleDesc,
1019                                                                                           toast_values, toast_isnull);
1020                 new_tuple_len = new_header_len + new_data_len;
1021
1022                 /*
1023                  * Allocate and zero the space needed, and fill HeapTupleData fields.
1024                  */
1025                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
1026                 result_tuple->t_len = new_tuple_len;
1027                 result_tuple->t_self = newtup->t_self;
1028                 result_tuple->t_tableOid = newtup->t_tableOid;
1029                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
1030                 result_tuple->t_data = new_data;
1031
1032                 /*
1033                  * Copy the existing tuple header, but adjust natts and t_hoff.
1034                  */
1035                 memcpy(new_data, olddata, SizeofHeapTupleHeader);
1036                 HeapTupleHeaderSetNatts(new_data, numAttrs);
1037                 new_data->t_hoff = new_header_len;
1038                 if (olddata->t_infomask & HEAP_HASOID)
1039                         HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
1040
1041                 /* Copy over the data, and fill the null bitmap if needed */
1042                 heap_fill_tuple(tupleDesc,
1043                                                 toast_values,
1044                                                 toast_isnull,
1045                                                 (char *) new_data + new_header_len,
1046                                                 new_data_len,
1047                                                 &(new_data->t_infomask),
1048                                                 has_nulls ? new_data->t_bits : NULL);
1049         }
1050         else
1051                 result_tuple = newtup;
1052
1053         /*
1054          * Free allocated temp values
1055          */
1056         if (need_free)
1057                 for (i = 0; i < numAttrs; i++)
1058                         if (toast_free[i])
1059                                 pfree(DatumGetPointer(toast_values[i]));
1060
1061         /*
1062          * Delete external values from the old tuple
1063          */
1064         if (need_delold)
1065                 for (i = 0; i < numAttrs; i++)
1066                         if (toast_delold[i])
1067                                 toast_delete_datum(rel, toast_oldvalues[i], false);
1068
1069         return result_tuple;
1070 }
1071
1072
1073 /* ----------
1074  * toast_flatten_tuple -
1075  *
1076  *      "Flatten" a tuple to contain no out-of-line toasted fields.
1077  *      (This does not eliminate compressed or short-header datums.)
1078  *
1079  *      Note: we expect the caller already checked HeapTupleHasExternal(tup),
1080  *      so there is no need for a short-circuit path.
1081  * ----------
1082  */
1083 HeapTuple
1084 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
1085 {
1086         HeapTuple       new_tuple;
1087         Form_pg_attribute *att = tupleDesc->attrs;
1088         int                     numAttrs = tupleDesc->natts;
1089         int                     i;
1090         Datum           toast_values[MaxTupleAttributeNumber];
1091         bool            toast_isnull[MaxTupleAttributeNumber];
1092         bool            toast_free[MaxTupleAttributeNumber];
1093
1094         /*
1095          * Break down the tuple into fields.
1096          */
1097         Assert(numAttrs <= MaxTupleAttributeNumber);
1098         heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
1099
1100         memset(toast_free, 0, numAttrs * sizeof(bool));
1101
1102         for (i = 0; i < numAttrs; i++)
1103         {
1104                 /*
1105                  * Look at non-null varlena attributes
1106                  */
1107                 if (!toast_isnull[i] && att[i]->attlen == -1)
1108                 {
1109                         struct varlena *new_value;
1110
1111                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1112                         if (VARATT_IS_EXTERNAL(new_value))
1113                         {
1114                                 new_value = heap_tuple_fetch_attr(new_value);
1115                                 toast_values[i] = PointerGetDatum(new_value);
1116                                 toast_free[i] = true;
1117                         }
1118                 }
1119         }
1120
1121         /*
1122          * Form the reconfigured tuple.
1123          */
1124         new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
1125
1126         /*
1127          * Be sure to copy the tuple's OID and identity fields.  We also make a
1128          * point of copying visibility info, just in case anybody looks at those
1129          * fields in a syscache entry.
1130          */
1131         if (tupleDesc->tdhasoid)
1132                 HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
1133
1134         new_tuple->t_self = tup->t_self;
1135         new_tuple->t_tableOid = tup->t_tableOid;
1136
1137         new_tuple->t_data->t_choice = tup->t_data->t_choice;
1138         new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
1139         new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
1140         new_tuple->t_data->t_infomask |=
1141                 tup->t_data->t_infomask & HEAP_XACT_MASK;
1142         new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
1143         new_tuple->t_data->t_infomask2 |=
1144                 tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1145
1146         /*
1147          * Free allocated temp values
1148          */
1149         for (i = 0; i < numAttrs; i++)
1150                 if (toast_free[i])
1151                         pfree(DatumGetPointer(toast_values[i]));
1152
1153         return new_tuple;
1154 }
1155
1156
1157 /* ----------
1158  * toast_flatten_tuple_to_datum -
1159  *
1160  *      "Flatten" a tuple containing out-of-line toasted fields into a Datum.
1161  *      The result is always palloc'd in the current memory context.
1162  *
1163  *      We have a general rule that Datums of container types (rows, arrays,
1164  *      ranges, etc) must not contain any external TOAST pointers.  Without
1165  *      this rule, we'd have to look inside each Datum when preparing a tuple
1166  *      for storage, which would be expensive and would fail to extend cleanly
1167  *      to new sorts of container types.
1168  *
1169  *      However, we don't want to say that tuples represented as HeapTuples
1170  *      can't contain toasted fields, so instead this routine should be called
1171  *      when such a HeapTuple is being converted into a Datum.
1172  *
1173  *      While we're at it, we decompress any compressed fields too.  This is not
1174  *      necessary for correctness, but reflects an expectation that compression
1175  *      will be more effective if applied to the whole tuple not individual
1176  *      fields.  We are not so concerned about that that we want to deconstruct
1177  *      and reconstruct tuples just to get rid of compressed fields, however.
1178  *      So callers typically won't call this unless they see that the tuple has
1179  *      at least one external field.
1180  *
1181  *      On the other hand, in-line short-header varlena fields are left alone.
1182  *      If we "untoasted" them here, they'd just get changed back to short-header
1183  *      format anyway within heap_fill_tuple.
1184  * ----------
1185  */
1186 Datum
1187 toast_flatten_tuple_to_datum(HeapTupleHeader tup,
1188                                                          uint32 tup_len,
1189                                                          TupleDesc tupleDesc)
1190 {
1191         HeapTupleHeader new_data;
1192         int32           new_header_len;
1193         int32           new_data_len;
1194         int32           new_tuple_len;
1195         HeapTupleData tmptup;
1196         Form_pg_attribute *att = tupleDesc->attrs;
1197         int                     numAttrs = tupleDesc->natts;
1198         int                     i;
1199         bool            has_nulls = false;
1200         Datum           toast_values[MaxTupleAttributeNumber];
1201         bool            toast_isnull[MaxTupleAttributeNumber];
1202         bool            toast_free[MaxTupleAttributeNumber];
1203
1204         /* Build a temporary HeapTuple control structure */
1205         tmptup.t_len = tup_len;
1206         ItemPointerSetInvalid(&(tmptup.t_self));
1207         tmptup.t_tableOid = InvalidOid;
1208         tmptup.t_data = tup;
1209
1210         /*
1211          * Break down the tuple into fields.
1212          */
1213         Assert(numAttrs <= MaxTupleAttributeNumber);
1214         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1215
1216         memset(toast_free, 0, numAttrs * sizeof(bool));
1217
1218         for (i = 0; i < numAttrs; i++)
1219         {
1220                 /*
1221                  * Look at non-null varlena attributes
1222                  */
1223                 if (toast_isnull[i])
1224                         has_nulls = true;
1225                 else if (att[i]->attlen == -1)
1226                 {
1227                         struct varlena *new_value;
1228
1229                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1230                         if (VARATT_IS_EXTERNAL(new_value) ||
1231                                 VARATT_IS_COMPRESSED(new_value))
1232                         {
1233                                 new_value = heap_tuple_untoast_attr(new_value);
1234                                 toast_values[i] = PointerGetDatum(new_value);
1235                                 toast_free[i] = true;
1236                         }
1237                 }
1238         }
1239
1240         /*
1241          * Calculate the new size of the tuple.
1242          *
1243          * This should match the reconstruction code in toast_insert_or_update.
1244          */
1245         new_header_len = SizeofHeapTupleHeader;
1246         if (has_nulls)
1247                 new_header_len += BITMAPLEN(numAttrs);
1248         if (tup->t_infomask & HEAP_HASOID)
1249                 new_header_len += sizeof(Oid);
1250         new_header_len = MAXALIGN(new_header_len);
1251         new_data_len = heap_compute_data_size(tupleDesc,
1252                                                                                   toast_values, toast_isnull);
1253         new_tuple_len = new_header_len + new_data_len;
1254
1255         new_data = (HeapTupleHeader) palloc0(new_tuple_len);
1256
1257         /*
1258          * Copy the existing tuple header, but adjust natts and t_hoff.
1259          */
1260         memcpy(new_data, tup, SizeofHeapTupleHeader);
1261         HeapTupleHeaderSetNatts(new_data, numAttrs);
1262         new_data->t_hoff = new_header_len;
1263         if (tup->t_infomask & HEAP_HASOID)
1264                 HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(tup));
1265
1266         /* Set the composite-Datum header fields correctly */
1267         HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
1268         HeapTupleHeaderSetTypeId(new_data, tupleDesc->tdtypeid);
1269         HeapTupleHeaderSetTypMod(new_data, tupleDesc->tdtypmod);
1270
1271         /* Copy over the data, and fill the null bitmap if needed */
1272         heap_fill_tuple(tupleDesc,
1273                                         toast_values,
1274                                         toast_isnull,
1275                                         (char *) new_data + new_header_len,
1276                                         new_data_len,
1277                                         &(new_data->t_infomask),
1278                                         has_nulls ? new_data->t_bits : NULL);
1279
1280         /*
1281          * Free allocated temp values
1282          */
1283         for (i = 0; i < numAttrs; i++)
1284                 if (toast_free[i])
1285                         pfree(DatumGetPointer(toast_values[i]));
1286
1287         return PointerGetDatum(new_data);
1288 }
1289
1290
1291 /* ----------
1292  * toast_compress_datum -
1293  *
1294  *      Create a compressed version of a varlena datum
1295  *
1296  *      If we fail (ie, compressed result is actually bigger than original)
1297  *      then return NULL.  We must not use compressed data if it'd expand
1298  *      the tuple!
1299  *
1300  *      We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1301  *      copying them.  But we can't handle external or compressed datums.
1302  * ----------
1303  */
1304 Datum
1305 toast_compress_datum(Datum value)
1306 {
1307         struct varlena *tmp;
1308         int32           valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1309         int32           len;
1310
1311         Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1312         Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1313
1314         /*
1315          * No point in wasting a palloc cycle if value size is out of the allowed
1316          * range for compression
1317          */
1318         if (valsize < PGLZ_strategy_default->min_input_size ||
1319                 valsize > PGLZ_strategy_default->max_input_size)
1320                 return PointerGetDatum(NULL);
1321
1322         tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
1323                                                                         TOAST_COMPRESS_HDRSZ);
1324
1325         /*
1326          * We recheck the actual size even if pglz_compress() reports success,
1327          * because it might be satisfied with having saved as little as one byte
1328          * in the compressed data --- which could turn into a net loss once you
1329          * consider header and alignment padding.  Worst case, the compressed
1330          * format might require three padding bytes (plus header, which is
1331          * included in VARSIZE(tmp)), whereas the uncompressed format would take
1332          * only one header byte and no padding if the value is short enough.  So
1333          * we insist on a savings of more than 2 bytes to ensure we have a gain.
1334          */
1335         len = pglz_compress(VARDATA_ANY(DatumGetPointer(value)),
1336                                                 valsize,
1337                                                 TOAST_COMPRESS_RAWDATA(tmp),
1338                                                 PGLZ_strategy_default);
1339         if (len >= 0 &&
1340                 len + TOAST_COMPRESS_HDRSZ < valsize - 2)
1341         {
1342                 TOAST_COMPRESS_SET_RAWSIZE(tmp, valsize);
1343                 SET_VARSIZE_COMPRESSED(tmp, len + TOAST_COMPRESS_HDRSZ);
1344                 /* successful compression */
1345                 return PointerGetDatum(tmp);
1346         }
1347         else
1348         {
1349                 /* incompressible data */
1350                 pfree(tmp);
1351                 return PointerGetDatum(NULL);
1352         }
1353 }
1354
1355
1356 /* ----------
1357  * toast_get_valid_index
1358  *
1359  *      Get OID of valid index associated to given toast relation. A toast
1360  *      relation can have only one valid index at the same time.
1361  */
1362 Oid
1363 toast_get_valid_index(Oid toastoid, LOCKMODE lock)
1364 {
1365         int                     num_indexes;
1366         int                     validIndex;
1367         Oid                     validIndexOid;
1368         Relation   *toastidxs;
1369         Relation        toastrel;
1370
1371         /* Open the toast relation */
1372         toastrel = heap_open(toastoid, lock);
1373
1374         /* Look for the valid index of the toast relation */
1375         validIndex = toast_open_indexes(toastrel,
1376                                                                         lock,
1377                                                                         &toastidxs,
1378                                                                         &num_indexes);
1379         validIndexOid = RelationGetRelid(toastidxs[validIndex]);
1380
1381         /* Close the toast relation and all its indexes */
1382         toast_close_indexes(toastidxs, num_indexes, lock);
1383         heap_close(toastrel, lock);
1384
1385         return validIndexOid;
1386 }
1387
1388
1389 /* ----------
1390  * toast_save_datum -
1391  *
1392  *      Save one single datum into the secondary relation and return
1393  *      a Datum reference for it.
1394  *
1395  * rel: the main relation we're working with (not the toast rel!)
1396  * value: datum to be pushed to toast storage
1397  * oldexternal: if not NULL, toast pointer previously representing the datum
1398  * options: options to be passed to heap_insert() for toast rows
1399  * ----------
1400  */
1401 static Datum
1402 toast_save_datum(Relation rel, Datum value,
1403                                  struct varlena * oldexternal, int options)
1404 {
1405         Relation        toastrel;
1406         Relation   *toastidxs;
1407         HeapTuple       toasttup;
1408         TupleDesc       toasttupDesc;
1409         Datum           t_values[3];
1410         bool            t_isnull[3];
1411         CommandId       mycid = GetCurrentCommandId(true);
1412         struct varlena *result;
1413         struct varatt_external toast_pointer;
1414         union
1415         {
1416                 struct varlena hdr;
1417                 /* this is to make the union big enough for a chunk: */
1418                 char            data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ];
1419                 /* ensure union is aligned well enough: */
1420                 int32           align_it;
1421         }                       chunk_data;
1422         int32           chunk_size;
1423         int32           chunk_seq = 0;
1424         char       *data_p;
1425         int32           data_todo;
1426         Pointer         dval = DatumGetPointer(value);
1427         int                     num_indexes;
1428         int                     validIndex;
1429
1430         Assert(!VARATT_IS_EXTERNAL(value));
1431
1432         /*
1433          * Open the toast relation and its indexes.  We can use the index to check
1434          * uniqueness of the OID we assign to the toasted item, even though it has
1435          * additional columns besides OID.
1436          */
1437         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1438         toasttupDesc = toastrel->rd_att;
1439
1440         /* Open all the toast indexes and look for the valid one */
1441         validIndex = toast_open_indexes(toastrel,
1442                                                                         RowExclusiveLock,
1443                                                                         &toastidxs,
1444                                                                         &num_indexes);
1445
1446         /*
1447          * Get the data pointer and length, and compute va_rawsize and va_extsize.
1448          *
1449          * va_rawsize is the size of the equivalent fully uncompressed datum, so
1450          * we have to adjust for short headers.
1451          *
1452          * va_extsize is the actual size of the data payload in the toast records.
1453          */
1454         if (VARATT_IS_SHORT(dval))
1455         {
1456                 data_p = VARDATA_SHORT(dval);
1457                 data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1458                 toast_pointer.va_rawsize = data_todo + VARHDRSZ;                /* as if not short */
1459                 toast_pointer.va_extsize = data_todo;
1460         }
1461         else if (VARATT_IS_COMPRESSED(dval))
1462         {
1463                 data_p = VARDATA(dval);
1464                 data_todo = VARSIZE(dval) - VARHDRSZ;
1465                 /* rawsize in a compressed datum is just the size of the payload */
1466                 toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1467                 toast_pointer.va_extsize = data_todo;
1468                 /* Assert that the numbers look like it's compressed */
1469                 Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1470         }
1471         else
1472         {
1473                 data_p = VARDATA(dval);
1474                 data_todo = VARSIZE(dval) - VARHDRSZ;
1475                 toast_pointer.va_rawsize = VARSIZE(dval);
1476                 toast_pointer.va_extsize = data_todo;
1477         }
1478
1479         /*
1480          * Insert the correct table OID into the result TOAST pointer.
1481          *
1482          * Normally this is the actual OID of the target toast table, but during
1483          * table-rewriting operations such as CLUSTER, we have to insert the OID
1484          * of the table's real permanent toast table instead.  rd_toastoid is set
1485          * if we have to substitute such an OID.
1486          */
1487         if (OidIsValid(rel->rd_toastoid))
1488                 toast_pointer.va_toastrelid = rel->rd_toastoid;
1489         else
1490                 toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1491
1492         /*
1493          * Choose an OID to use as the value ID for this toast value.
1494          *
1495          * Normally we just choose an unused OID within the toast table.  But
1496          * during table-rewriting operations where we are preserving an existing
1497          * toast table OID, we want to preserve toast value OIDs too.  So, if
1498          * rd_toastoid is set and we had a prior external value from that same
1499          * toast table, re-use its value ID.  If we didn't have a prior external
1500          * value (which is a corner case, but possible if the table's attstorage
1501          * options have been changed), we have to pick a value ID that doesn't
1502          * conflict with either new or existing toast value OIDs.
1503          */
1504         if (!OidIsValid(rel->rd_toastoid))
1505         {
1506                 /* normal case: just choose an unused OID */
1507                 toast_pointer.va_valueid =
1508                         GetNewOidWithIndex(toastrel,
1509                                                            RelationGetRelid(toastidxs[validIndex]),
1510                                                            (AttrNumber) 1);
1511         }
1512         else
1513         {
1514                 /* rewrite case: check to see if value was in old toast table */
1515                 toast_pointer.va_valueid = InvalidOid;
1516                 if (oldexternal != NULL)
1517                 {
1518                         struct varatt_external old_toast_pointer;
1519
1520                         Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
1521                         /* Must copy to access aligned fields */
1522                         VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1523                         if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1524                         {
1525                                 /* This value came from the old toast table; reuse its OID */
1526                                 toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1527
1528                                 /*
1529                                  * There is a corner case here: the table rewrite might have
1530                                  * to copy both live and recently-dead versions of a row, and
1531                                  * those versions could easily reference the same toast value.
1532                                  * When we copy the second or later version of such a row,
1533                                  * reusing the OID will mean we select an OID that's already
1534                                  * in the new toast table.  Check for that, and if so, just
1535                                  * fall through without writing the data again.
1536                                  *
1537                                  * While annoying and ugly-looking, this is a good thing
1538                                  * because it ensures that we wind up with only one copy of
1539                                  * the toast value when there is only one copy in the old
1540                                  * toast table.  Before we detected this case, we'd have made
1541                                  * multiple copies, wasting space; and what's worse, the
1542                                  * copies belonging to already-deleted heap tuples would not
1543                                  * be reclaimed by VACUUM.
1544                                  */
1545                                 if (toastrel_valueid_exists(toastrel,
1546                                                                                         toast_pointer.va_valueid))
1547                                 {
1548                                         /* Match, so short-circuit the data storage loop below */
1549                                         data_todo = 0;
1550                                 }
1551                         }
1552                 }
1553                 if (toast_pointer.va_valueid == InvalidOid)
1554                 {
1555                         /*
1556                          * new value; must choose an OID that doesn't conflict in either
1557                          * old or new toast table
1558                          */
1559                         do
1560                         {
1561                                 toast_pointer.va_valueid =
1562                                         GetNewOidWithIndex(toastrel,
1563                                                                          RelationGetRelid(toastidxs[validIndex]),
1564                                                                            (AttrNumber) 1);
1565                         } while (toastid_valueid_exists(rel->rd_toastoid,
1566                                                                                         toast_pointer.va_valueid));
1567                 }
1568         }
1569
1570         /*
1571          * Initialize constant parts of the tuple data
1572          */
1573         t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1574         t_values[2] = PointerGetDatum(&chunk_data);
1575         t_isnull[0] = false;
1576         t_isnull[1] = false;
1577         t_isnull[2] = false;
1578
1579         /*
1580          * Split up the item into chunks
1581          */
1582         while (data_todo > 0)
1583         {
1584                 int                     i;
1585
1586                 CHECK_FOR_INTERRUPTS();
1587
1588                 /*
1589                  * Calculate the size of this chunk
1590                  */
1591                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1592
1593                 /*
1594                  * Build a tuple and store it
1595                  */
1596                 t_values[1] = Int32GetDatum(chunk_seq++);
1597                 SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1598                 memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1599                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1600
1601                 heap_insert(toastrel, toasttup, mycid, options, NULL);
1602
1603                 /*
1604                  * Create the index entry.  We cheat a little here by not using
1605                  * FormIndexDatum: this relies on the knowledge that the index columns
1606                  * are the same as the initial columns of the table for all the
1607                  * indexes.  We also cheat by not providing an IndexInfo: this is okay
1608                  * for now because btree doesn't need one, but we might have to be
1609                  * more honest someday.
1610                  *
1611                  * Note also that there had better not be any user-created index on
1612                  * the TOAST table, since we don't bother to update anything else.
1613                  */
1614                 for (i = 0; i < num_indexes; i++)
1615                 {
1616                         /* Only index relations marked as ready can be updated */
1617                         if (IndexIsReady(toastidxs[i]->rd_index))
1618                                 index_insert(toastidxs[i], t_values, t_isnull,
1619                                                          &(toasttup->t_self),
1620                                                          toastrel,
1621                                                          toastidxs[i]->rd_index->indisunique ?
1622                                                          UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
1623                                                          NULL);
1624                 }
1625
1626                 /*
1627                  * Free memory
1628                  */
1629                 heap_freetuple(toasttup);
1630
1631                 /*
1632                  * Move on to next chunk
1633                  */
1634                 data_todo -= chunk_size;
1635                 data_p += chunk_size;
1636         }
1637
1638         /*
1639          * Done - close toast relation and its indexes
1640          */
1641         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1642         heap_close(toastrel, RowExclusiveLock);
1643
1644         /*
1645          * Create the TOAST pointer value that we'll return
1646          */
1647         result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1648         SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
1649         memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1650
1651         return PointerGetDatum(result);
1652 }
1653
1654
1655 /* ----------
1656  * toast_delete_datum -
1657  *
1658  *      Delete a single external stored value.
1659  * ----------
1660  */
1661 static void
1662 toast_delete_datum(Relation rel, Datum value, bool is_speculative)
1663 {
1664         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1665         struct varatt_external toast_pointer;
1666         Relation        toastrel;
1667         Relation   *toastidxs;
1668         ScanKeyData toastkey;
1669         SysScanDesc toastscan;
1670         HeapTuple       toasttup;
1671         int                     num_indexes;
1672         int                     validIndex;
1673         SnapshotData SnapshotToast;
1674
1675         if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1676                 return;
1677
1678         /* Must copy to access aligned fields */
1679         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1680
1681         /*
1682          * Open the toast relation and its indexes
1683          */
1684         toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1685
1686         /* Fetch valid relation used for process */
1687         validIndex = toast_open_indexes(toastrel,
1688                                                                         RowExclusiveLock,
1689                                                                         &toastidxs,
1690                                                                         &num_indexes);
1691
1692         /*
1693          * Setup a scan key to find chunks with matching va_valueid
1694          */
1695         ScanKeyInit(&toastkey,
1696                                 (AttrNumber) 1,
1697                                 BTEqualStrategyNumber, F_OIDEQ,
1698                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1699
1700         /*
1701          * Find all the chunks.  (We don't actually care whether we see them in
1702          * sequence or not, but since we've already locked the index we might as
1703          * well use systable_beginscan_ordered.)
1704          */
1705         init_toast_snapshot(&SnapshotToast);
1706         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1707                                                                                    &SnapshotToast, 1, &toastkey);
1708         while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1709         {
1710                 /*
1711                  * Have a chunk, delete it
1712                  */
1713                 if (is_speculative)
1714                         heap_abort_speculative(toastrel, toasttup);
1715                 else
1716                         simple_heap_delete(toastrel, &toasttup->t_self);
1717         }
1718
1719         /*
1720          * End scan and close relations
1721          */
1722         systable_endscan_ordered(toastscan);
1723         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1724         heap_close(toastrel, RowExclusiveLock);
1725 }
1726
1727
1728 /* ----------
1729  * toastrel_valueid_exists -
1730  *
1731  *      Test whether a toast value with the given ID exists in the toast relation
1732  * ----------
1733  */
1734 static bool
1735 toastrel_valueid_exists(Relation toastrel, Oid valueid)
1736 {
1737         bool            result = false;
1738         ScanKeyData toastkey;
1739         SysScanDesc toastscan;
1740         int                     num_indexes;
1741         int                     validIndex;
1742         Relation   *toastidxs;
1743         SnapshotData SnapshotToast;
1744
1745         /* Fetch a valid index relation */
1746         validIndex = toast_open_indexes(toastrel,
1747                                                                         RowExclusiveLock,
1748                                                                         &toastidxs,
1749                                                                         &num_indexes);
1750
1751         /*
1752          * Setup a scan key to find chunks with matching va_valueid
1753          */
1754         ScanKeyInit(&toastkey,
1755                                 (AttrNumber) 1,
1756                                 BTEqualStrategyNumber, F_OIDEQ,
1757                                 ObjectIdGetDatum(valueid));
1758
1759         /*
1760          * Is there any such chunk?
1761          */
1762         init_toast_snapshot(&SnapshotToast);
1763         toastscan = systable_beginscan(toastrel,
1764                                                                    RelationGetRelid(toastidxs[validIndex]),
1765                                                                    true, &SnapshotToast, 1, &toastkey);
1766
1767         if (systable_getnext(toastscan) != NULL)
1768                 result = true;
1769
1770         systable_endscan(toastscan);
1771
1772         /* Clean up */
1773         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1774
1775         return result;
1776 }
1777
1778 /* ----------
1779  * toastid_valueid_exists -
1780  *
1781  *      As above, but work from toast rel's OID not an open relation
1782  * ----------
1783  */
1784 static bool
1785 toastid_valueid_exists(Oid toastrelid, Oid valueid)
1786 {
1787         bool            result;
1788         Relation        toastrel;
1789
1790         toastrel = heap_open(toastrelid, AccessShareLock);
1791
1792         result = toastrel_valueid_exists(toastrel, valueid);
1793
1794         heap_close(toastrel, AccessShareLock);
1795
1796         return result;
1797 }
1798
1799
1800 /* ----------
1801  * toast_fetch_datum -
1802  *
1803  *      Reconstruct an in memory Datum from the chunks saved
1804  *      in the toast relation
1805  * ----------
1806  */
1807 static struct varlena *
1808 toast_fetch_datum(struct varlena * attr)
1809 {
1810         Relation        toastrel;
1811         Relation   *toastidxs;
1812         ScanKeyData toastkey;
1813         SysScanDesc toastscan;
1814         HeapTuple       ttup;
1815         TupleDesc       toasttupDesc;
1816         struct varlena *result;
1817         struct varatt_external toast_pointer;
1818         int32           ressize;
1819         int32           residx,
1820                                 nextidx;
1821         int32           numchunks;
1822         Pointer         chunk;
1823         bool            isnull;
1824         char       *chunkdata;
1825         int32           chunksize;
1826         int                     num_indexes;
1827         int                     validIndex;
1828         SnapshotData SnapshotToast;
1829
1830         if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1831                 elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
1832
1833         /* Must copy to access aligned fields */
1834         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1835
1836         ressize = toast_pointer.va_extsize;
1837         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1838
1839         result = (struct varlena *) palloc(ressize + VARHDRSZ);
1840
1841         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1842                 SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1843         else
1844                 SET_VARSIZE(result, ressize + VARHDRSZ);
1845
1846         /*
1847          * Open the toast relation and its indexes
1848          */
1849         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1850         toasttupDesc = toastrel->rd_att;
1851
1852         /* Look for the valid index of the toast relation */
1853         validIndex = toast_open_indexes(toastrel,
1854                                                                         AccessShareLock,
1855                                                                         &toastidxs,
1856                                                                         &num_indexes);
1857
1858         /*
1859          * Setup a scan key to fetch from the index by va_valueid
1860          */
1861         ScanKeyInit(&toastkey,
1862                                 (AttrNumber) 1,
1863                                 BTEqualStrategyNumber, F_OIDEQ,
1864                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1865
1866         /*
1867          * Read the chunks by index
1868          *
1869          * Note that because the index is actually on (valueid, chunkidx) we will
1870          * see the chunks in chunkidx order, even though we didn't explicitly ask
1871          * for it.
1872          */
1873         nextidx = 0;
1874
1875         init_toast_snapshot(&SnapshotToast);
1876         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1877                                                                                    &SnapshotToast, 1, &toastkey);
1878         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1879         {
1880                 /*
1881                  * Have a chunk, extract the sequence number and the data
1882                  */
1883                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1884                 Assert(!isnull);
1885                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1886                 Assert(!isnull);
1887                 if (!VARATT_IS_EXTENDED(chunk))
1888                 {
1889                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1890                         chunkdata = VARDATA(chunk);
1891                 }
1892                 else if (VARATT_IS_SHORT(chunk))
1893                 {
1894                         /* could happen due to heap_form_tuple doing its thing */
1895                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1896                         chunkdata = VARDATA_SHORT(chunk);
1897                 }
1898                 else
1899                 {
1900                         /* should never happen */
1901                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1902                                  toast_pointer.va_valueid,
1903                                  RelationGetRelationName(toastrel));
1904                         chunksize = 0;          /* keep compiler quiet */
1905                         chunkdata = NULL;
1906                 }
1907
1908                 /*
1909                  * Some checks on the data we've found
1910                  */
1911                 if (residx != nextidx)
1912                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1913                                  residx, nextidx,
1914                                  toast_pointer.va_valueid,
1915                                  RelationGetRelationName(toastrel));
1916                 if (residx < numchunks - 1)
1917                 {
1918                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1919                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1920                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1921                                          residx, numchunks,
1922                                          toast_pointer.va_valueid,
1923                                          RelationGetRelationName(toastrel));
1924                 }
1925                 else if (residx == numchunks - 1)
1926                 {
1927                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1928                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
1929                                          chunksize,
1930                                          (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1931                                          residx,
1932                                          toast_pointer.va_valueid,
1933                                          RelationGetRelationName(toastrel));
1934                 }
1935                 else
1936                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1937                                  residx,
1938                                  0, numchunks - 1,
1939                                  toast_pointer.va_valueid,
1940                                  RelationGetRelationName(toastrel));
1941
1942                 /*
1943                  * Copy the data into proper place in our result
1944                  */
1945                 memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1946                            chunkdata,
1947                            chunksize);
1948
1949                 nextidx++;
1950         }
1951
1952         /*
1953          * Final checks that we successfully fetched the datum
1954          */
1955         if (nextidx != numchunks)
1956                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
1957                          nextidx,
1958                          toast_pointer.va_valueid,
1959                          RelationGetRelationName(toastrel));
1960
1961         /*
1962          * End scan and close relations
1963          */
1964         systable_endscan_ordered(toastscan);
1965         toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
1966         heap_close(toastrel, AccessShareLock);
1967
1968         return result;
1969 }
1970
1971 /* ----------
1972  * toast_fetch_datum_slice -
1973  *
1974  *      Reconstruct a segment of a Datum from the chunks saved
1975  *      in the toast relation
1976  * ----------
1977  */
1978 static struct varlena *
1979 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1980 {
1981         Relation        toastrel;
1982         Relation   *toastidxs;
1983         ScanKeyData toastkey[3];
1984         int                     nscankeys;
1985         SysScanDesc toastscan;
1986         HeapTuple       ttup;
1987         TupleDesc       toasttupDesc;
1988         struct varlena *result;
1989         struct varatt_external toast_pointer;
1990         int32           attrsize;
1991         int32           residx;
1992         int32           nextidx;
1993         int                     numchunks;
1994         int                     startchunk;
1995         int                     endchunk;
1996         int32           startoffset;
1997         int32           endoffset;
1998         int                     totalchunks;
1999         Pointer         chunk;
2000         bool            isnull;
2001         char       *chunkdata;
2002         int32           chunksize;
2003         int32           chcpystrt;
2004         int32           chcpyend;
2005         int                     num_indexes;
2006         int                     validIndex;
2007         SnapshotData SnapshotToast;
2008
2009         if (!VARATT_IS_EXTERNAL_ONDISK(attr))
2010                 elog(ERROR, "toast_fetch_datum_slice shouldn't be called for non-ondisk datums");
2011
2012         /* Must copy to access aligned fields */
2013         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
2014
2015         /*
2016          * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
2017          * we can't return a compressed datum which is meaningful to toast later
2018          */
2019         Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
2020
2021         attrsize = toast_pointer.va_extsize;
2022         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
2023
2024         if (sliceoffset >= attrsize)
2025         {
2026                 sliceoffset = 0;
2027                 length = 0;
2028         }
2029
2030         if (((sliceoffset + length) > attrsize) || length < 0)
2031                 length = attrsize - sliceoffset;
2032
2033         result = (struct varlena *) palloc(length + VARHDRSZ);
2034
2035         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2036                 SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
2037         else
2038                 SET_VARSIZE(result, length + VARHDRSZ);
2039
2040         if (length == 0)
2041                 return result;                  /* Can save a lot of work at this point! */
2042
2043         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
2044         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
2045         numchunks = (endchunk - startchunk) + 1;
2046
2047         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
2048         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
2049
2050         /*
2051          * Open the toast relation and its indexes
2052          */
2053         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
2054         toasttupDesc = toastrel->rd_att;
2055
2056         /* Look for the valid index of toast relation */
2057         validIndex = toast_open_indexes(toastrel,
2058                                                                         AccessShareLock,
2059                                                                         &toastidxs,
2060                                                                         &num_indexes);
2061
2062         /*
2063          * Setup a scan key to fetch from the index. This is either two keys or
2064          * three depending on the number of chunks.
2065          */
2066         ScanKeyInit(&toastkey[0],
2067                                 (AttrNumber) 1,
2068                                 BTEqualStrategyNumber, F_OIDEQ,
2069                                 ObjectIdGetDatum(toast_pointer.va_valueid));
2070
2071         /*
2072          * Use equality condition for one chunk, a range condition otherwise:
2073          */
2074         if (numchunks == 1)
2075         {
2076                 ScanKeyInit(&toastkey[1],
2077                                         (AttrNumber) 2,
2078                                         BTEqualStrategyNumber, F_INT4EQ,
2079                                         Int32GetDatum(startchunk));
2080                 nscankeys = 2;
2081         }
2082         else
2083         {
2084                 ScanKeyInit(&toastkey[1],
2085                                         (AttrNumber) 2,
2086                                         BTGreaterEqualStrategyNumber, F_INT4GE,
2087                                         Int32GetDatum(startchunk));
2088                 ScanKeyInit(&toastkey[2],
2089                                         (AttrNumber) 2,
2090                                         BTLessEqualStrategyNumber, F_INT4LE,
2091                                         Int32GetDatum(endchunk));
2092                 nscankeys = 3;
2093         }
2094
2095         /*
2096          * Read the chunks by index
2097          *
2098          * The index is on (valueid, chunkidx) so they will come in order
2099          */
2100         init_toast_snapshot(&SnapshotToast);
2101         nextidx = startchunk;
2102         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
2103                                                                                 &SnapshotToast, nscankeys, toastkey);
2104         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
2105         {
2106                 /*
2107                  * Have a chunk, extract the sequence number and the data
2108                  */
2109                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
2110                 Assert(!isnull);
2111                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
2112                 Assert(!isnull);
2113                 if (!VARATT_IS_EXTENDED(chunk))
2114                 {
2115                         chunksize = VARSIZE(chunk) - VARHDRSZ;
2116                         chunkdata = VARDATA(chunk);
2117                 }
2118                 else if (VARATT_IS_SHORT(chunk))
2119                 {
2120                         /* could happen due to heap_form_tuple doing its thing */
2121                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2122                         chunkdata = VARDATA_SHORT(chunk);
2123                 }
2124                 else
2125                 {
2126                         /* should never happen */
2127                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
2128                                  toast_pointer.va_valueid,
2129                                  RelationGetRelationName(toastrel));
2130                         chunksize = 0;          /* keep compiler quiet */
2131                         chunkdata = NULL;
2132                 }
2133
2134                 /*
2135                  * Some checks on the data we've found
2136                  */
2137                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
2138                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
2139                                  residx, nextidx,
2140                                  toast_pointer.va_valueid,
2141                                  RelationGetRelationName(toastrel));
2142                 if (residx < totalchunks - 1)
2143                 {
2144                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
2145                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
2146                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
2147                                          residx, totalchunks,
2148                                          toast_pointer.va_valueid,
2149                                          RelationGetRelationName(toastrel));
2150                 }
2151                 else if (residx == totalchunks - 1)
2152                 {
2153                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
2154                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
2155                                          chunksize,
2156                                          (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
2157                                          residx,
2158                                          toast_pointer.va_valueid,
2159                                          RelationGetRelationName(toastrel));
2160                 }
2161                 else
2162                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2163                                  residx,
2164                                  0, totalchunks - 1,
2165                                  toast_pointer.va_valueid,
2166                                  RelationGetRelationName(toastrel));
2167
2168                 /*
2169                  * Copy the data into proper place in our result
2170                  */
2171                 chcpystrt = 0;
2172                 chcpyend = chunksize - 1;
2173                 if (residx == startchunk)
2174                         chcpystrt = startoffset;
2175                 if (residx == endchunk)
2176                         chcpyend = endoffset;
2177
2178                 memcpy(VARDATA(result) +
2179                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
2180                            chunkdata + chcpystrt,
2181                            (chcpyend - chcpystrt) + 1);
2182
2183                 nextidx++;
2184         }
2185
2186         /*
2187          * Final checks that we successfully fetched the datum
2188          */
2189         if (nextidx != (endchunk + 1))
2190                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
2191                          nextidx,
2192                          toast_pointer.va_valueid,
2193                          RelationGetRelationName(toastrel));
2194
2195         /*
2196          * End scan and close relations
2197          */
2198         systable_endscan_ordered(toastscan);
2199         toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2200         heap_close(toastrel, AccessShareLock);
2201
2202         return result;
2203 }
2204
2205 /* ----------
2206  * toast_decompress_datum -
2207  *
2208  * Decompress a compressed version of a varlena datum
2209  */
2210 static struct varlena *
2211 toast_decompress_datum(struct varlena * attr)
2212 {
2213         struct varlena *result;
2214
2215         Assert(VARATT_IS_COMPRESSED(attr));
2216
2217         result = (struct varlena *)
2218                 palloc(TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2219         SET_VARSIZE(result, TOAST_COMPRESS_RAWSIZE(attr) + VARHDRSZ);
2220
2221         if (pglz_decompress(TOAST_COMPRESS_RAWDATA(attr),
2222                                                 VARSIZE(attr) - TOAST_COMPRESS_HDRSZ,
2223                                                 VARDATA(result),
2224                                                 TOAST_COMPRESS_RAWSIZE(attr)) < 0)
2225                 elog(ERROR, "compressed data is corrupted");
2226
2227         return result;
2228 }
2229
2230
2231 /* ----------
2232  * toast_open_indexes
2233  *
2234  *      Get an array of the indexes associated to the given toast relation
2235  *      and return as well the position of the valid index used by the toast
2236  *      relation in this array. It is the responsibility of the caller of this
2237  *      function to close the indexes as well as free them.
2238  */
2239 static int
2240 toast_open_indexes(Relation toastrel,
2241                                    LOCKMODE lock,
2242                                    Relation **toastidxs,
2243                                    int *num_indexes)
2244 {
2245         int                     i = 0;
2246         int                     res = 0;
2247         bool            found = false;
2248         List       *indexlist;
2249         ListCell   *lc;
2250
2251         /* Get index list of the toast relation */
2252         indexlist = RelationGetIndexList(toastrel);
2253         Assert(indexlist != NIL);
2254
2255         *num_indexes = list_length(indexlist);
2256
2257         /* Open all the index relations */
2258         *toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation));
2259         foreach(lc, indexlist)
2260                 (*toastidxs)[i++] = index_open(lfirst_oid(lc), lock);
2261
2262         /* Fetch the first valid index in list */
2263         for (i = 0; i < *num_indexes; i++)
2264         {
2265                 Relation        toastidx = (*toastidxs)[i];
2266
2267                 if (toastidx->rd_index->indisvalid)
2268                 {
2269                         res = i;
2270                         found = true;
2271                         break;
2272                 }
2273         }
2274
2275         /*
2276          * Free index list, not necessary anymore as relations are opened and a
2277          * valid index has been found.
2278          */
2279         list_free(indexlist);
2280
2281         /*
2282          * The toast relation should have one valid index, so something is going
2283          * wrong if there is nothing.
2284          */
2285         if (!found)
2286                 elog(ERROR, "no valid index found for toast relation with Oid %u",
2287                          RelationGetRelid(toastrel));
2288
2289         return res;
2290 }
2291
2292 /* ----------
2293  * toast_close_indexes
2294  *
2295  *      Close an array of indexes for a toast relation and free it. This should
2296  *      be called for a set of indexes opened previously with toast_open_indexes.
2297  */
2298 static void
2299 toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock)
2300 {
2301         int                     i;
2302
2303         /* Close relations and clean up things */
2304         for (i = 0; i < num_indexes; i++)
2305                 index_close(toastidxs[i], lock);
2306         pfree(toastidxs);
2307 }
2308
2309 /* ----------
2310  * init_toast_snapshot
2311  *
2312  *      Initialize an appropriate TOAST snapshot.  We must use an MVCC snapshot
2313  *      to initialize the TOAST snapshot; since we don't know which one to use,
2314  *      just use the oldest one.  This is safe: at worst, we will get a "snapshot
2315  *      too old" error that might have been avoided otherwise.
2316  */
2317 static void
2318 init_toast_snapshot(Snapshot toast_snapshot)
2319 {
2320         Snapshot        snapshot = GetOldestSnapshot();
2321
2322         if (snapshot == NULL)
2323                 elog(ERROR, "no known snapshots");
2324
2325         InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken);
2326 }