]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Fix race condition with toast table access from a stale syscache entry.
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2011, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "utils/fmgroids.h"
39 #include "utils/pg_lzcompress.h"
40 #include "utils/rel.h"
41 #include "utils/typcache.h"
42 #include "utils/tqual.h"
43
44
45 #undef TOAST_DEBUG
46
47 /* Size of an EXTERNAL datum that contains a standard TOAST pointer */
48 #define TOAST_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_external))
49
50 /*
51  * Testing whether an externally-stored value is compressed now requires
52  * comparing extsize (the actual length of the external data) to rawsize
53  * (the original uncompressed datum's size).  The latter includes VARHDRSZ
54  * overhead, the former doesn't.  We never use compression unless it actually
55  * saves space, so we expect either equality or less-than.
56  */
57 #define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \
58         ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ)
59
60 /*
61  * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum
62  * into a local "struct varatt_external" toast pointer.  This should be
63  * just a memcpy, but some versions of gcc seem to produce broken code
64  * that assumes the datum contents are aligned.  Introducing an explicit
65  * intermediate "varattrib_1b_e *" variable seems to fix it.
66  */
67 #define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \
68 do { \
69         varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \
70         Assert(VARATT_IS_EXTERNAL(attre)); \
71         Assert(VARSIZE_EXTERNAL(attre) == sizeof(toast_pointer) + VARHDRSZ_EXTERNAL); \
72         memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
73 } while (0)
74
75
76 static void toast_delete_datum(Relation rel, Datum value);
77 static Datum toast_save_datum(Relation rel, Datum value,
78                                  struct varlena *oldexternal, int options);
79 static bool toast_valueid_exists(Oid toastrelid, Oid valueid);
80 static struct varlena *toast_fetch_datum(struct varlena * attr);
81 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
82                                                 int32 sliceoffset, int32 length);
83
84
85 /* ----------
86  * heap_tuple_fetch_attr -
87  *
88  *      Public entry point to get back a toasted value from
89  *      external storage (possibly still in compressed format).
90  *
91  * This will return a datum that contains all the data internally, ie, not
92  * relying on external storage, but it can still be compressed or have a short
93  * header.
94  ----------
95  */
96 struct varlena *
97 heap_tuple_fetch_attr(struct varlena * attr)
98 {
99         struct varlena *result;
100
101         if (VARATT_IS_EXTERNAL(attr))
102         {
103                 /*
104                  * This is an external stored plain value
105                  */
106                 result = toast_fetch_datum(attr);
107         }
108         else
109         {
110                 /*
111                  * This is a plain value inside of the main tuple - why am I called?
112                  */
113                 result = attr;
114         }
115
116         return result;
117 }
118
119
120 /* ----------
121  * heap_tuple_untoast_attr -
122  *
123  *      Public entry point to get back a toasted value from compression
124  *      or external storage.
125  * ----------
126  */
127 struct varlena *
128 heap_tuple_untoast_attr(struct varlena * attr)
129 {
130         if (VARATT_IS_EXTERNAL(attr))
131         {
132                 /*
133                  * This is an externally stored datum --- fetch it back from there
134                  */
135                 attr = toast_fetch_datum(attr);
136                 /* If it's compressed, decompress it */
137                 if (VARATT_IS_COMPRESSED(attr))
138                 {
139                         PGLZ_Header *tmp = (PGLZ_Header *) attr;
140
141                         attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
142                         SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
143                         pglz_decompress(tmp, VARDATA(attr));
144                         pfree(tmp);
145                 }
146         }
147         else if (VARATT_IS_COMPRESSED(attr))
148         {
149                 /*
150                  * This is a compressed value inside of the main tuple
151                  */
152                 PGLZ_Header *tmp = (PGLZ_Header *) attr;
153
154                 attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
155                 SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
156                 pglz_decompress(tmp, VARDATA(attr));
157         }
158         else if (VARATT_IS_SHORT(attr))
159         {
160                 /*
161                  * This is a short-header varlena --- convert to 4-byte header format
162                  */
163                 Size            data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
164                 Size            new_size = data_size + VARHDRSZ;
165                 struct varlena *new_attr;
166
167                 new_attr = (struct varlena *) palloc(new_size);
168                 SET_VARSIZE(new_attr, new_size);
169                 memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
170                 attr = new_attr;
171         }
172
173         return attr;
174 }
175
176
177 /* ----------
178  * heap_tuple_untoast_attr_slice -
179  *
180  *              Public entry point to get back part of a toasted value
181  *              from compression or external storage.
182  * ----------
183  */
184 struct varlena *
185 heap_tuple_untoast_attr_slice(struct varlena * attr,
186                                                           int32 sliceoffset, int32 slicelength)
187 {
188         struct varlena *preslice;
189         struct varlena *result;
190         char       *attrdata;
191         int32           attrsize;
192
193         if (VARATT_IS_EXTERNAL(attr))
194         {
195                 struct varatt_external toast_pointer;
196
197                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
198
199                 /* fast path for non-compressed external datums */
200                 if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
201                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
202
203                 /* fetch it back (compressed marker will get set automatically) */
204                 preslice = toast_fetch_datum(attr);
205         }
206         else
207                 preslice = attr;
208
209         if (VARATT_IS_COMPRESSED(preslice))
210         {
211                 PGLZ_Header *tmp = (PGLZ_Header *) preslice;
212                 Size            size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
213
214                 preslice = (struct varlena *) palloc(size);
215                 SET_VARSIZE(preslice, size);
216                 pglz_decompress(tmp, VARDATA(preslice));
217
218                 if (tmp != (PGLZ_Header *) attr)
219                         pfree(tmp);
220         }
221
222         if (VARATT_IS_SHORT(preslice))
223         {
224                 attrdata = VARDATA_SHORT(preslice);
225                 attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
226         }
227         else
228         {
229                 attrdata = VARDATA(preslice);
230                 attrsize = VARSIZE(preslice) - VARHDRSZ;
231         }
232
233         /* slicing of datum for compressed cases and plain value */
234
235         if (sliceoffset >= attrsize)
236         {
237                 sliceoffset = 0;
238                 slicelength = 0;
239         }
240
241         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
242                 slicelength = attrsize - sliceoffset;
243
244         result = (struct varlena *) palloc(slicelength + VARHDRSZ);
245         SET_VARSIZE(result, slicelength + VARHDRSZ);
246
247         memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
248
249         if (preslice != attr)
250                 pfree(preslice);
251
252         return result;
253 }
254
255
256 /* ----------
257  * toast_raw_datum_size -
258  *
259  *      Return the raw (detoasted) size of a varlena datum
260  *      (including the VARHDRSZ header)
261  * ----------
262  */
263 Size
264 toast_raw_datum_size(Datum value)
265 {
266         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
267         Size            result;
268
269         if (VARATT_IS_EXTERNAL(attr))
270         {
271                 /* va_rawsize is the size of the original datum -- including header */
272                 struct varatt_external toast_pointer;
273
274                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
275                 result = toast_pointer.va_rawsize;
276         }
277         else if (VARATT_IS_COMPRESSED(attr))
278         {
279                 /* here, va_rawsize is just the payload size */
280                 result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
281         }
282         else if (VARATT_IS_SHORT(attr))
283         {
284                 /*
285                  * we have to normalize the header length to VARHDRSZ or else the
286                  * callers of this function will be confused.
287                  */
288                 result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
289         }
290         else
291         {
292                 /* plain untoasted datum */
293                 result = VARSIZE(attr);
294         }
295         return result;
296 }
297
298 /* ----------
299  * toast_datum_size
300  *
301  *      Return the physical storage size (possibly compressed) of a varlena datum
302  * ----------
303  */
304 Size
305 toast_datum_size(Datum value)
306 {
307         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
308         Size            result;
309
310         if (VARATT_IS_EXTERNAL(attr))
311         {
312                 /*
313                  * Attribute is stored externally - return the extsize whether
314                  * compressed or not.  We do not count the size of the toast pointer
315                  * ... should we?
316                  */
317                 struct varatt_external toast_pointer;
318
319                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
320                 result = toast_pointer.va_extsize;
321         }
322         else if (VARATT_IS_SHORT(attr))
323         {
324                 result = VARSIZE_SHORT(attr);
325         }
326         else
327         {
328                 /*
329                  * Attribute is stored inline either compressed or not, just calculate
330                  * the size of the datum in either case.
331                  */
332                 result = VARSIZE(attr);
333         }
334         return result;
335 }
336
337
338 /* ----------
339  * toast_delete -
340  *
341  *      Cascaded delete toast-entries on DELETE
342  * ----------
343  */
344 void
345 toast_delete(Relation rel, HeapTuple oldtup)
346 {
347         TupleDesc       tupleDesc;
348         Form_pg_attribute *att;
349         int                     numAttrs;
350         int                     i;
351         Datum           toast_values[MaxHeapAttributeNumber];
352         bool            toast_isnull[MaxHeapAttributeNumber];
353
354         /*
355          * We should only ever be called for tuples of plain relations ---
356          * recursing on a toast rel is bad news.
357          */
358         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
359
360         /*
361          * Get the tuple descriptor and break down the tuple into fields.
362          *
363          * NOTE: it's debatable whether to use heap_deform_tuple() here or just
364          * heap_getattr() only the varlena columns.  The latter could win if there
365          * are few varlena columns and many non-varlena ones. However,
366          * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
367          * O(N^2) if there are many varlena columns, so it seems better to err on
368          * the side of linear cost.  (We won't even be here unless there's at
369          * least one varlena column, by the way.)
370          */
371         tupleDesc = rel->rd_att;
372         att = tupleDesc->attrs;
373         numAttrs = tupleDesc->natts;
374
375         Assert(numAttrs <= MaxHeapAttributeNumber);
376         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
377
378         /*
379          * Check for external stored attributes and delete them from the secondary
380          * relation.
381          */
382         for (i = 0; i < numAttrs; i++)
383         {
384                 if (att[i]->attlen == -1)
385                 {
386                         Datum           value = toast_values[i];
387
388                         if (!toast_isnull[i] && VARATT_IS_EXTERNAL(PointerGetDatum(value)))
389                                 toast_delete_datum(rel, value);
390                 }
391         }
392 }
393
394
395 /* ----------
396  * toast_insert_or_update -
397  *
398  *      Delete no-longer-used toast-entries and create new ones to
399  *      make the new tuple fit on INSERT or UPDATE
400  *
401  * Inputs:
402  *      newtup: the candidate new tuple to be inserted
403  *      oldtup: the old row version for UPDATE, or NULL for INSERT
404  *      options: options to be passed to heap_insert() for toast rows
405  * Result:
406  *      either newtup if no toasting is needed, or a palloc'd modified tuple
407  *      that is what should actually get stored
408  *
409  * NOTE: neither newtup nor oldtup will be modified.  This is a change
410  * from the pre-8.1 API of this routine.
411  * ----------
412  */
413 HeapTuple
414 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
415                                            int options)
416 {
417         HeapTuple       result_tuple;
418         TupleDesc       tupleDesc;
419         Form_pg_attribute *att;
420         int                     numAttrs;
421         int                     i;
422
423         bool            need_change = false;
424         bool            need_free = false;
425         bool            need_delold = false;
426         bool            has_nulls = false;
427
428         Size            maxDataLen;
429         Size            hoff;
430
431         char            toast_action[MaxHeapAttributeNumber];
432         bool            toast_isnull[MaxHeapAttributeNumber];
433         bool            toast_oldisnull[MaxHeapAttributeNumber];
434         Datum           toast_values[MaxHeapAttributeNumber];
435         Datum           toast_oldvalues[MaxHeapAttributeNumber];
436         struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
437         int32           toast_sizes[MaxHeapAttributeNumber];
438         bool            toast_free[MaxHeapAttributeNumber];
439         bool            toast_delold[MaxHeapAttributeNumber];
440
441         /*
442          * We should only ever be called for tuples of plain relations ---
443          * recursing on a toast rel is bad news.
444          */
445         Assert(rel->rd_rel->relkind == RELKIND_RELATION);
446
447         /*
448          * Get the tuple descriptor and break down the tuple(s) into fields.
449          */
450         tupleDesc = rel->rd_att;
451         att = tupleDesc->attrs;
452         numAttrs = tupleDesc->natts;
453
454         Assert(numAttrs <= MaxHeapAttributeNumber);
455         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
456         if (oldtup != NULL)
457                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
458
459         /* ----------
460          * Then collect information about the values given
461          *
462          * NOTE: toast_action[i] can have these values:
463          *              ' '             default handling
464          *              'p'             already processed --- don't touch it
465          *              'x'             incompressible, but OK to move off
466          *
467          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
468          *              toast_action[i] different from 'p'.
469          * ----------
470          */
471         memset(toast_action, ' ', numAttrs * sizeof(char));
472         memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
473         memset(toast_free, 0, numAttrs * sizeof(bool));
474         memset(toast_delold, 0, numAttrs * sizeof(bool));
475
476         for (i = 0; i < numAttrs; i++)
477         {
478                 struct varlena *old_value;
479                 struct varlena *new_value;
480
481                 if (oldtup != NULL)
482                 {
483                         /*
484                          * For UPDATE get the old and new values of this attribute
485                          */
486                         old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
487                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
488
489                         /*
490                          * If the old value is an external stored one, check if it has
491                          * changed so we have to delete it later.
492                          */
493                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
494                                 VARATT_IS_EXTERNAL(old_value))
495                         {
496                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
497                                         memcmp((char *) old_value, (char *) new_value,
498                                                    VARSIZE_EXTERNAL(old_value)) != 0)
499                                 {
500                                         /*
501                                          * The old external stored value isn't needed any more
502                                          * after the update
503                                          */
504                                         toast_delold[i] = true;
505                                         need_delold = true;
506                                 }
507                                 else
508                                 {
509                                         /*
510                                          * This attribute isn't changed by this update so we reuse
511                                          * the original reference to the old value in the new
512                                          * tuple.
513                                          */
514                                         toast_action[i] = 'p';
515                                         continue;
516                                 }
517                         }
518                 }
519                 else
520                 {
521                         /*
522                          * For INSERT simply get the new value
523                          */
524                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
525                 }
526
527                 /*
528                  * Handle NULL attributes
529                  */
530                 if (toast_isnull[i])
531                 {
532                         toast_action[i] = 'p';
533                         has_nulls = true;
534                         continue;
535                 }
536
537                 /*
538                  * Now look at varlena attributes
539                  */
540                 if (att[i]->attlen == -1)
541                 {
542                         /*
543                          * If the table's attribute says PLAIN always, force it so.
544                          */
545                         if (att[i]->attstorage == 'p')
546                                 toast_action[i] = 'p';
547
548                         /*
549                          * We took care of UPDATE above, so any external value we find
550                          * still in the tuple must be someone else's we cannot reuse.
551                          * Fetch it back (without decompression, unless we are forcing
552                          * PLAIN storage).      If necessary, we'll push it out as a new
553                          * external value below.
554                          */
555                         if (VARATT_IS_EXTERNAL(new_value))
556                         {
557                                 toast_oldexternal[i] = new_value;
558                                 if (att[i]->attstorage == 'p')
559                                         new_value = heap_tuple_untoast_attr(new_value);
560                                 else
561                                         new_value = heap_tuple_fetch_attr(new_value);
562                                 toast_values[i] = PointerGetDatum(new_value);
563                                 toast_free[i] = true;
564                                 need_change = true;
565                                 need_free = true;
566                         }
567
568                         /*
569                          * Remember the size of this attribute
570                          */
571                         toast_sizes[i] = VARSIZE_ANY(new_value);
572                 }
573                 else
574                 {
575                         /*
576                          * Not a varlena attribute, plain storage always
577                          */
578                         toast_action[i] = 'p';
579                 }
580         }
581
582         /* ----------
583          * Compress and/or save external until data fits into target length
584          *
585          *      1: Inline compress attributes with attstorage 'x', and store very
586          *         large attributes with attstorage 'x' or 'e' external immediately
587          *      2: Store attributes with attstorage 'x' or 'e' external
588          *      3: Inline compress attributes with attstorage 'm'
589          *      4: Store attributes with attstorage 'm' external
590          * ----------
591          */
592
593         /* compute header overhead --- this should match heap_form_tuple() */
594         hoff = offsetof(HeapTupleHeaderData, t_bits);
595         if (has_nulls)
596                 hoff += BITMAPLEN(numAttrs);
597         if (newtup->t_data->t_infomask & HEAP_HASOID)
598                 hoff += sizeof(Oid);
599         hoff = MAXALIGN(hoff);
600         Assert(hoff == newtup->t_data->t_hoff);
601         /* now convert to a limit on the tuple data size */
602         maxDataLen = TOAST_TUPLE_TARGET - hoff;
603
604         /*
605          * Look for attributes with attstorage 'x' to compress.  Also find large
606          * attributes with attstorage 'x' or 'e', and store them external.
607          */
608         while (heap_compute_data_size(tupleDesc,
609                                                                   toast_values, toast_isnull) > maxDataLen)
610         {
611                 int                     biggest_attno = -1;
612                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
613                 Datum           old_value;
614                 Datum           new_value;
615
616                 /*
617                  * Search for the biggest yet unprocessed internal attribute
618                  */
619                 for (i = 0; i < numAttrs; i++)
620                 {
621                         if (toast_action[i] != ' ')
622                                 continue;
623                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
624                                 continue;               /* can't happen, toast_action would be 'p' */
625                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
626                                 continue;
627                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
628                                 continue;
629                         if (toast_sizes[i] > biggest_size)
630                         {
631                                 biggest_attno = i;
632                                 biggest_size = toast_sizes[i];
633                         }
634                 }
635
636                 if (biggest_attno < 0)
637                         break;
638
639                 /*
640                  * Attempt to compress it inline, if it has attstorage 'x'
641                  */
642                 i = biggest_attno;
643                 if (att[i]->attstorage == 'x')
644                 {
645                         old_value = toast_values[i];
646                         new_value = toast_compress_datum(old_value);
647
648                         if (DatumGetPointer(new_value) != NULL)
649                         {
650                                 /* successful compression */
651                                 if (toast_free[i])
652                                         pfree(DatumGetPointer(old_value));
653                                 toast_values[i] = new_value;
654                                 toast_free[i] = true;
655                                 toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
656                                 need_change = true;
657                                 need_free = true;
658                         }
659                         else
660                         {
661                                 /* incompressible, ignore on subsequent compression passes */
662                                 toast_action[i] = 'x';
663                         }
664                 }
665                 else
666                 {
667                         /* has attstorage 'e', ignore on subsequent compression passes */
668                         toast_action[i] = 'x';
669                 }
670
671                 /*
672                  * If this value is by itself more than maxDataLen (after compression
673                  * if any), push it out to the toast table immediately, if possible.
674                  * This avoids uselessly compressing other fields in the common case
675                  * where we have one long field and several short ones.
676                  *
677                  * XXX maybe the threshold should be less than maxDataLen?
678                  */
679                 if (toast_sizes[i] > maxDataLen &&
680                         rel->rd_rel->reltoastrelid != InvalidOid)
681                 {
682                         old_value = toast_values[i];
683                         toast_action[i] = 'p';
684                         toast_values[i] = toast_save_datum(rel, toast_values[i],
685                                                                                            toast_oldexternal[i], options);
686                         if (toast_free[i])
687                                 pfree(DatumGetPointer(old_value));
688                         toast_free[i] = true;
689                         need_change = true;
690                         need_free = true;
691                 }
692         }
693
694         /*
695          * Second we look for attributes of attstorage 'x' or 'e' that are still
696          * inline.      But skip this if there's no toast table to push them to.
697          */
698         while (heap_compute_data_size(tupleDesc,
699                                                                   toast_values, toast_isnull) > maxDataLen &&
700                    rel->rd_rel->reltoastrelid != InvalidOid)
701         {
702                 int                     biggest_attno = -1;
703                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
704                 Datum           old_value;
705
706                 /*------
707                  * Search for the biggest yet inlined attribute with
708                  * attstorage equals 'x' or 'e'
709                  *------
710                  */
711                 for (i = 0; i < numAttrs; i++)
712                 {
713                         if (toast_action[i] == 'p')
714                                 continue;
715                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
716                                 continue;               /* can't happen, toast_action would be 'p' */
717                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
718                                 continue;
719                         if (toast_sizes[i] > biggest_size)
720                         {
721                                 biggest_attno = i;
722                                 biggest_size = toast_sizes[i];
723                         }
724                 }
725
726                 if (biggest_attno < 0)
727                         break;
728
729                 /*
730                  * Store this external
731                  */
732                 i = biggest_attno;
733                 old_value = toast_values[i];
734                 toast_action[i] = 'p';
735                 toast_values[i] = toast_save_datum(rel, toast_values[i],
736                                                                                    toast_oldexternal[i], options);
737                 if (toast_free[i])
738                         pfree(DatumGetPointer(old_value));
739                 toast_free[i] = true;
740
741                 need_change = true;
742                 need_free = true;
743         }
744
745         /*
746          * Round 3 - this time we take attributes with storage 'm' into
747          * compression
748          */
749         while (heap_compute_data_size(tupleDesc,
750                                                                   toast_values, toast_isnull) > maxDataLen)
751         {
752                 int                     biggest_attno = -1;
753                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
754                 Datum           old_value;
755                 Datum           new_value;
756
757                 /*
758                  * Search for the biggest yet uncompressed internal attribute
759                  */
760                 for (i = 0; i < numAttrs; i++)
761                 {
762                         if (toast_action[i] != ' ')
763                                 continue;
764                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
765                                 continue;               /* can't happen, toast_action would be 'p' */
766                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
767                                 continue;
768                         if (att[i]->attstorage != 'm')
769                                 continue;
770                         if (toast_sizes[i] > biggest_size)
771                         {
772                                 biggest_attno = i;
773                                 biggest_size = toast_sizes[i];
774                         }
775                 }
776
777                 if (biggest_attno < 0)
778                         break;
779
780                 /*
781                  * Attempt to compress it inline
782                  */
783                 i = biggest_attno;
784                 old_value = toast_values[i];
785                 new_value = toast_compress_datum(old_value);
786
787                 if (DatumGetPointer(new_value) != NULL)
788                 {
789                         /* successful compression */
790                         if (toast_free[i])
791                                 pfree(DatumGetPointer(old_value));
792                         toast_values[i] = new_value;
793                         toast_free[i] = true;
794                         toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
795                         need_change = true;
796                         need_free = true;
797                 }
798                 else
799                 {
800                         /* incompressible, ignore on subsequent compression passes */
801                         toast_action[i] = 'x';
802                 }
803         }
804
805         /*
806          * Finally we store attributes of type 'm' externally.  At this point we
807          * increase the target tuple size, so that 'm' attributes aren't stored
808          * externally unless really necessary.
809          */
810         maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
811
812         while (heap_compute_data_size(tupleDesc,
813                                                                   toast_values, toast_isnull) > maxDataLen &&
814                    rel->rd_rel->reltoastrelid != InvalidOid)
815         {
816                 int                     biggest_attno = -1;
817                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
818                 Datum           old_value;
819
820                 /*--------
821                  * Search for the biggest yet inlined attribute with
822                  * attstorage = 'm'
823                  *--------
824                  */
825                 for (i = 0; i < numAttrs; i++)
826                 {
827                         if (toast_action[i] == 'p')
828                                 continue;
829                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
830                                 continue;               /* can't happen, toast_action would be 'p' */
831                         if (att[i]->attstorage != 'm')
832                                 continue;
833                         if (toast_sizes[i] > biggest_size)
834                         {
835                                 biggest_attno = i;
836                                 biggest_size = toast_sizes[i];
837                         }
838                 }
839
840                 if (biggest_attno < 0)
841                         break;
842
843                 /*
844                  * Store this external
845                  */
846                 i = biggest_attno;
847                 old_value = toast_values[i];
848                 toast_action[i] = 'p';
849                 toast_values[i] = toast_save_datum(rel, toast_values[i],
850                                                                                    toast_oldexternal[i], options);
851                 if (toast_free[i])
852                         pfree(DatumGetPointer(old_value));
853                 toast_free[i] = true;
854
855                 need_change = true;
856                 need_free = true;
857         }
858
859         /*
860          * In the case we toasted any values, we need to build a new heap tuple
861          * with the changed values.
862          */
863         if (need_change)
864         {
865                 HeapTupleHeader olddata = newtup->t_data;
866                 HeapTupleHeader new_data;
867                 int32           new_len;
868                 int32           new_data_len;
869
870                 /*
871                  * Calculate the new size of the tuple.  Header size should not
872                  * change, but data size might.
873                  */
874                 new_len = offsetof(HeapTupleHeaderData, t_bits);
875                 if (has_nulls)
876                         new_len += BITMAPLEN(numAttrs);
877                 if (olddata->t_infomask & HEAP_HASOID)
878                         new_len += sizeof(Oid);
879                 new_len = MAXALIGN(new_len);
880                 Assert(new_len == olddata->t_hoff);
881                 new_data_len = heap_compute_data_size(tupleDesc,
882                                                                                           toast_values, toast_isnull);
883                 new_len += new_data_len;
884
885                 /*
886                  * Allocate and zero the space needed, and fill HeapTupleData fields.
887                  */
888                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
889                 result_tuple->t_len = new_len;
890                 result_tuple->t_self = newtup->t_self;
891                 result_tuple->t_tableOid = newtup->t_tableOid;
892                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
893                 result_tuple->t_data = new_data;
894
895                 /*
896                  * Put the existing tuple header and the changed values into place
897                  */
898                 memcpy(new_data, olddata, olddata->t_hoff);
899
900                 heap_fill_tuple(tupleDesc,
901                                                 toast_values,
902                                                 toast_isnull,
903                                                 (char *) new_data + olddata->t_hoff,
904                                                 new_data_len,
905                                                 &(new_data->t_infomask),
906                                                 has_nulls ? new_data->t_bits : NULL);
907         }
908         else
909                 result_tuple = newtup;
910
911         /*
912          * Free allocated temp values
913          */
914         if (need_free)
915                 for (i = 0; i < numAttrs; i++)
916                         if (toast_free[i])
917                                 pfree(DatumGetPointer(toast_values[i]));
918
919         /*
920          * Delete external values from the old tuple
921          */
922         if (need_delold)
923                 for (i = 0; i < numAttrs; i++)
924                         if (toast_delold[i])
925                                 toast_delete_datum(rel, toast_oldvalues[i]);
926
927         return result_tuple;
928 }
929
930
931 /* ----------
932  * toast_flatten_tuple -
933  *
934  *      "Flatten" a tuple to contain no out-of-line toasted fields.
935  *      (This does not eliminate compressed or short-header datums.)
936  * ----------
937  */
938 HeapTuple
939 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
940 {
941         HeapTuple       new_tuple;
942         Form_pg_attribute *att = tupleDesc->attrs;
943         int                     numAttrs = tupleDesc->natts;
944         int                     i;
945         Datum           toast_values[MaxTupleAttributeNumber];
946         bool            toast_isnull[MaxTupleAttributeNumber];
947         bool            toast_free[MaxTupleAttributeNumber];
948
949         /*
950          * Break down the tuple into fields.
951          */
952         Assert(numAttrs <= MaxTupleAttributeNumber);
953         heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
954
955         memset(toast_free, 0, numAttrs * sizeof(bool));
956
957         for (i = 0; i < numAttrs; i++)
958         {
959                 /*
960                  * Look at non-null varlena attributes
961                  */
962                 if (!toast_isnull[i] && att[i]->attlen == -1)
963                 {
964                         struct varlena *new_value;
965
966                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
967                         if (VARATT_IS_EXTERNAL(new_value))
968                         {
969                                 new_value = toast_fetch_datum(new_value);
970                                 toast_values[i] = PointerGetDatum(new_value);
971                                 toast_free[i] = true;
972                         }
973                 }
974         }
975
976         /*
977          * Form the reconfigured tuple.
978          */
979         new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
980
981         /*
982          * Be sure to copy the tuple's OID and identity fields.  We also make a
983          * point of copying visibility info, just in case anybody looks at those
984          * fields in a syscache entry.
985          */
986         if (tupleDesc->tdhasoid)
987                 HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
988
989         new_tuple->t_self = tup->t_self;
990         new_tuple->t_tableOid = tup->t_tableOid;
991
992         new_tuple->t_data->t_choice = tup->t_data->t_choice;
993         new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
994         new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
995         new_tuple->t_data->t_infomask |=
996                 tup->t_data->t_infomask & HEAP_XACT_MASK;
997         new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
998         new_tuple->t_data->t_infomask2 |=
999                 tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1000
1001         /*
1002          * Free allocated temp values
1003          */
1004         for (i = 0; i < numAttrs; i++)
1005                 if (toast_free[i])
1006                         pfree(DatumGetPointer(toast_values[i]));
1007
1008         return new_tuple;
1009 }
1010
1011
1012 /* ----------
1013  * toast_flatten_tuple_attribute -
1014  *
1015  *      If a Datum is of composite type, "flatten" it to contain no toasted fields.
1016  *      This must be invoked on any potentially-composite field that is to be
1017  *      inserted into a tuple.  Doing this preserves the invariant that toasting
1018  *      goes only one level deep in a tuple.
1019  *
1020  *      Note that flattening does not mean expansion of short-header varlenas,
1021  *      so in one sense toasting is allowed within composite datums.
1022  * ----------
1023  */
1024 Datum
1025 toast_flatten_tuple_attribute(Datum value,
1026                                                           Oid typeId, int32 typeMod)
1027 {
1028         TupleDesc       tupleDesc;
1029         HeapTupleHeader olddata;
1030         HeapTupleHeader new_data;
1031         int32           new_len;
1032         int32           new_data_len;
1033         HeapTupleData tmptup;
1034         Form_pg_attribute *att;
1035         int                     numAttrs;
1036         int                     i;
1037         bool            need_change = false;
1038         bool            has_nulls = false;
1039         Datum           toast_values[MaxTupleAttributeNumber];
1040         bool            toast_isnull[MaxTupleAttributeNumber];
1041         bool            toast_free[MaxTupleAttributeNumber];
1042
1043         /*
1044          * See if it's a composite type, and get the tupdesc if so.
1045          */
1046         tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
1047         if (tupleDesc == NULL)
1048                 return value;                   /* not a composite type */
1049
1050         att = tupleDesc->attrs;
1051         numAttrs = tupleDesc->natts;
1052
1053         /*
1054          * Break down the tuple into fields.
1055          */
1056         olddata = DatumGetHeapTupleHeader(value);
1057         Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
1058         Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
1059         /* Build a temporary HeapTuple control structure */
1060         tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
1061         ItemPointerSetInvalid(&(tmptup.t_self));
1062         tmptup.t_tableOid = InvalidOid;
1063         tmptup.t_data = olddata;
1064
1065         Assert(numAttrs <= MaxTupleAttributeNumber);
1066         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1067
1068         memset(toast_free, 0, numAttrs * sizeof(bool));
1069
1070         for (i = 0; i < numAttrs; i++)
1071         {
1072                 /*
1073                  * Look at non-null varlena attributes
1074                  */
1075                 if (toast_isnull[i])
1076                         has_nulls = true;
1077                 else if (att[i]->attlen == -1)
1078                 {
1079                         struct varlena *new_value;
1080
1081                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1082                         if (VARATT_IS_EXTERNAL(new_value) ||
1083                                 VARATT_IS_COMPRESSED(new_value))
1084                         {
1085                                 new_value = heap_tuple_untoast_attr(new_value);
1086                                 toast_values[i] = PointerGetDatum(new_value);
1087                                 toast_free[i] = true;
1088                                 need_change = true;
1089                         }
1090                 }
1091         }
1092
1093         /*
1094          * If nothing to untoast, just return the original tuple.
1095          */
1096         if (!need_change)
1097         {
1098                 ReleaseTupleDesc(tupleDesc);
1099                 return value;
1100         }
1101
1102         /*
1103          * Calculate the new size of the tuple.  Header size should not change,
1104          * but data size might.
1105          */
1106         new_len = offsetof(HeapTupleHeaderData, t_bits);
1107         if (has_nulls)
1108                 new_len += BITMAPLEN(numAttrs);
1109         if (olddata->t_infomask & HEAP_HASOID)
1110                 new_len += sizeof(Oid);
1111         new_len = MAXALIGN(new_len);
1112         Assert(new_len == olddata->t_hoff);
1113         new_data_len = heap_compute_data_size(tupleDesc,
1114                                                                                   toast_values, toast_isnull);
1115         new_len += new_data_len;
1116
1117         new_data = (HeapTupleHeader) palloc0(new_len);
1118
1119         /*
1120          * Put the tuple header and the changed values into place
1121          */
1122         memcpy(new_data, olddata, olddata->t_hoff);
1123
1124         HeapTupleHeaderSetDatumLength(new_data, new_len);
1125
1126         heap_fill_tuple(tupleDesc,
1127                                         toast_values,
1128                                         toast_isnull,
1129                                         (char *) new_data + olddata->t_hoff,
1130                                         new_data_len,
1131                                         &(new_data->t_infomask),
1132                                         has_nulls ? new_data->t_bits : NULL);
1133
1134         /*
1135          * Free allocated temp values
1136          */
1137         for (i = 0; i < numAttrs; i++)
1138                 if (toast_free[i])
1139                         pfree(DatumGetPointer(toast_values[i]));
1140         ReleaseTupleDesc(tupleDesc);
1141
1142         return PointerGetDatum(new_data);
1143 }
1144
1145
1146 /* ----------
1147  * toast_compress_datum -
1148  *
1149  *      Create a compressed version of a varlena datum
1150  *
1151  *      If we fail (ie, compressed result is actually bigger than original)
1152  *      then return NULL.  We must not use compressed data if it'd expand
1153  *      the tuple!
1154  *
1155  *      We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1156  *      copying them.  But we can't handle external or compressed datums.
1157  * ----------
1158  */
1159 Datum
1160 toast_compress_datum(Datum value)
1161 {
1162         struct varlena *tmp;
1163         int32           valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1164
1165         Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1166         Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1167
1168         /*
1169          * No point in wasting a palloc cycle if value size is out of the allowed
1170          * range for compression
1171          */
1172         if (valsize < PGLZ_strategy_default->min_input_size ||
1173                 valsize > PGLZ_strategy_default->max_input_size)
1174                 return PointerGetDatum(NULL);
1175
1176         tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
1177
1178         /*
1179          * We recheck the actual size even if pglz_compress() reports success,
1180          * because it might be satisfied with having saved as little as one byte
1181          * in the compressed data --- which could turn into a net loss once you
1182          * consider header and alignment padding.  Worst case, the compressed
1183          * format might require three padding bytes (plus header, which is
1184          * included in VARSIZE(tmp)), whereas the uncompressed format would take
1185          * only one header byte and no padding if the value is short enough.  So
1186          * we insist on a savings of more than 2 bytes to ensure we have a gain.
1187          */
1188         if (pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
1189                                           (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
1190                 VARSIZE(tmp) < valsize - 2)
1191         {
1192                 /* successful compression */
1193                 return PointerGetDatum(tmp);
1194         }
1195         else
1196         {
1197                 /* incompressible data */
1198                 pfree(tmp);
1199                 return PointerGetDatum(NULL);
1200         }
1201 }
1202
1203
1204 /* ----------
1205  * toast_save_datum -
1206  *
1207  *      Save one single datum into the secondary relation and return
1208  *      a Datum reference for it.
1209  *
1210  * rel: the main relation we're working with (not the toast rel!)
1211  * value: datum to be pushed to toast storage
1212  * oldexternal: if not NULL, toast pointer previously representing the datum
1213  * options: options to be passed to heap_insert() for toast rows
1214  * ----------
1215  */
1216 static Datum
1217 toast_save_datum(Relation rel, Datum value,
1218                                  struct varlena *oldexternal, int options)
1219 {
1220         Relation        toastrel;
1221         Relation        toastidx;
1222         HeapTuple       toasttup;
1223         TupleDesc       toasttupDesc;
1224         Datum           t_values[3];
1225         bool            t_isnull[3];
1226         CommandId       mycid = GetCurrentCommandId(true);
1227         struct varlena *result;
1228         struct varatt_external toast_pointer;
1229         struct
1230         {
1231                 struct varlena hdr;
1232                 char            data[TOAST_MAX_CHUNK_SIZE]; /* make struct big enough */
1233                 int32           align_it;       /* ensure struct is aligned well enough */
1234         }                       chunk_data;
1235         int32           chunk_size;
1236         int32           chunk_seq = 0;
1237         char       *data_p;
1238         int32           data_todo;
1239         Pointer         dval = DatumGetPointer(value);
1240
1241         /*
1242          * Open the toast relation and its index.  We can use the index to check
1243          * uniqueness of the OID we assign to the toasted item, even though it has
1244          * additional columns besides OID.
1245          */
1246         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1247         toasttupDesc = toastrel->rd_att;
1248         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1249
1250         /*
1251          * Get the data pointer and length, and compute va_rawsize and va_extsize.
1252          *
1253          * va_rawsize is the size of the equivalent fully uncompressed datum, so
1254          * we have to adjust for short headers.
1255          *
1256          * va_extsize is the actual size of the data payload in the toast records.
1257          */
1258         if (VARATT_IS_SHORT(dval))
1259         {
1260                 data_p = VARDATA_SHORT(dval);
1261                 data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1262                 toast_pointer.va_rawsize = data_todo + VARHDRSZ;                /* as if not short */
1263                 toast_pointer.va_extsize = data_todo;
1264         }
1265         else if (VARATT_IS_COMPRESSED(dval))
1266         {
1267                 data_p = VARDATA(dval);
1268                 data_todo = VARSIZE(dval) - VARHDRSZ;
1269                 /* rawsize in a compressed datum is just the size of the payload */
1270                 toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1271                 toast_pointer.va_extsize = data_todo;
1272                 /* Assert that the numbers look like it's compressed */
1273                 Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1274         }
1275         else
1276         {
1277                 data_p = VARDATA(dval);
1278                 data_todo = VARSIZE(dval) - VARHDRSZ;
1279                 toast_pointer.va_rawsize = VARSIZE(dval);
1280                 toast_pointer.va_extsize = data_todo;
1281         }
1282
1283         /*
1284          * Insert the correct table OID into the result TOAST pointer.
1285          *
1286          * Normally this is the actual OID of the target toast table, but during
1287          * table-rewriting operations such as CLUSTER, we have to insert the OID
1288          * of the table's real permanent toast table instead.  rd_toastoid is set
1289          * if we have to substitute such an OID.
1290          */
1291         if (OidIsValid(rel->rd_toastoid))
1292                 toast_pointer.va_toastrelid = rel->rd_toastoid;
1293         else
1294                 toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1295
1296         /*
1297          * Choose an OID to use as the value ID for this toast value.
1298          *
1299          * Normally we just choose an unused OID within the toast table.  But
1300          * during table-rewriting operations where we are preserving an existing
1301          * toast table OID, we want to preserve toast value OIDs too.  So, if
1302          * rd_toastoid is set and we had a prior external value from that same
1303          * toast table, re-use its value ID.  If we didn't have a prior external
1304          * value (which is a corner case, but possible if the table's attstorage
1305          * options have been changed), we have to pick a value ID that doesn't
1306          * conflict with either new or existing toast value OIDs.
1307          */
1308         if (!OidIsValid(rel->rd_toastoid))
1309         {
1310                 /* normal case: just choose an unused OID */
1311                 toast_pointer.va_valueid =
1312                         GetNewOidWithIndex(toastrel,
1313                                                            RelationGetRelid(toastidx),
1314                                                            (AttrNumber) 1);
1315         }
1316         else
1317         {
1318                 /* rewrite case: check to see if value was in old toast table */
1319                 toast_pointer.va_valueid = InvalidOid;
1320                 if (oldexternal != NULL)
1321                 {
1322                         struct varatt_external old_toast_pointer;
1323
1324                         Assert(VARATT_IS_EXTERNAL(oldexternal));
1325                         /* Must copy to access aligned fields */
1326                         VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1327                         if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1328                                 toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1329                 }
1330                 if (toast_pointer.va_valueid == InvalidOid)
1331                 {
1332                         /*
1333                          * new value; must choose an OID that doesn't conflict in either
1334                          * old or new toast table
1335                          */
1336                         do
1337                         {
1338                                 toast_pointer.va_valueid =
1339                                         GetNewOidWithIndex(toastrel,
1340                                                                            RelationGetRelid(toastidx),
1341                                                                            (AttrNumber) 1);
1342                         } while (toast_valueid_exists(rel->rd_toastoid,
1343                                                                                   toast_pointer.va_valueid));
1344                 }
1345         }
1346
1347         /*
1348          * Initialize constant parts of the tuple data
1349          */
1350         t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1351         t_values[2] = PointerGetDatum(&chunk_data);
1352         t_isnull[0] = false;
1353         t_isnull[1] = false;
1354         t_isnull[2] = false;
1355
1356         /*
1357          * Split up the item into chunks
1358          */
1359         while (data_todo > 0)
1360         {
1361                 /*
1362                  * Calculate the size of this chunk
1363                  */
1364                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1365
1366                 /*
1367                  * Build a tuple and store it
1368                  */
1369                 t_values[1] = Int32GetDatum(chunk_seq++);
1370                 SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1371                 memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1372                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1373
1374                 heap_insert(toastrel, toasttup, mycid, options, NULL);
1375
1376                 /*
1377                  * Create the index entry.      We cheat a little here by not using
1378                  * FormIndexDatum: this relies on the knowledge that the index columns
1379                  * are the same as the initial columns of the table.
1380                  *
1381                  * Note also that there had better not be any user-created index on
1382                  * the TOAST table, since we don't bother to update anything else.
1383                  */
1384                 index_insert(toastidx, t_values, t_isnull,
1385                                          &(toasttup->t_self),
1386                                          toastrel,
1387                                          toastidx->rd_index->indisunique ?
1388                                          UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
1389
1390                 /*
1391                  * Free memory
1392                  */
1393                 heap_freetuple(toasttup);
1394
1395                 /*
1396                  * Move on to next chunk
1397                  */
1398                 data_todo -= chunk_size;
1399                 data_p += chunk_size;
1400         }
1401
1402         /*
1403          * Done - close toast relation
1404          */
1405         index_close(toastidx, RowExclusiveLock);
1406         heap_close(toastrel, RowExclusiveLock);
1407
1408         /*
1409          * Create the TOAST pointer value that we'll return
1410          */
1411         result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1412         SET_VARSIZE_EXTERNAL(result, TOAST_POINTER_SIZE);
1413         memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1414
1415         return PointerGetDatum(result);
1416 }
1417
1418
1419 /* ----------
1420  * toast_delete_datum -
1421  *
1422  *      Delete a single external stored value.
1423  * ----------
1424  */
1425 static void
1426 toast_delete_datum(Relation rel, Datum value)
1427 {
1428         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1429         struct varatt_external toast_pointer;
1430         Relation        toastrel;
1431         Relation        toastidx;
1432         ScanKeyData toastkey;
1433         SysScanDesc toastscan;
1434         HeapTuple       toasttup;
1435
1436         if (!VARATT_IS_EXTERNAL(attr))
1437                 return;
1438
1439         /* Must copy to access aligned fields */
1440         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1441
1442         /*
1443          * Open the toast relation and its index
1444          */
1445         toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1446         toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1447
1448         /*
1449          * Setup a scan key to find chunks with matching va_valueid
1450          */
1451         ScanKeyInit(&toastkey,
1452                                 (AttrNumber) 1,
1453                                 BTEqualStrategyNumber, F_OIDEQ,
1454                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1455
1456         /*
1457          * Find all the chunks.  (We don't actually care whether we see them in
1458          * sequence or not, but since we've already locked the index we might as
1459          * well use systable_beginscan_ordered.)
1460          */
1461         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1462                                                                                    SnapshotToast, 1, &toastkey);
1463         while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1464         {
1465                 /*
1466                  * Have a chunk, delete it
1467                  */
1468                 simple_heap_delete(toastrel, &toasttup->t_self);
1469         }
1470
1471         /*
1472          * End scan and close relations
1473          */
1474         systable_endscan_ordered(toastscan);
1475         index_close(toastidx, RowExclusiveLock);
1476         heap_close(toastrel, RowExclusiveLock);
1477 }
1478
1479
1480 /* ----------
1481  * toast_valueid_exists -
1482  *
1483  *      Test whether a toast value with the given ID exists in the toast relation
1484  * ----------
1485  */
1486 static bool
1487 toast_valueid_exists(Oid toastrelid, Oid valueid)
1488 {
1489         bool            result = false;
1490         Relation        toastrel;
1491         ScanKeyData toastkey;
1492         SysScanDesc toastscan;
1493
1494         /*
1495          * Open the toast relation
1496          */
1497         toastrel = heap_open(toastrelid, AccessShareLock);
1498
1499         /*
1500          * Setup a scan key to find chunks with matching va_valueid
1501          */
1502         ScanKeyInit(&toastkey,
1503                                 (AttrNumber) 1,
1504                                 BTEqualStrategyNumber, F_OIDEQ,
1505                                 ObjectIdGetDatum(valueid));
1506
1507         /*
1508          * Is there any such chunk?
1509          */
1510         toastscan = systable_beginscan(toastrel, toastrel->rd_rel->reltoastidxid,
1511                                                                    true, SnapshotToast, 1, &toastkey);
1512
1513         if (systable_getnext(toastscan) != NULL)
1514                 result = true;
1515
1516         /*
1517          * End scan and close relations
1518          */
1519         systable_endscan(toastscan);
1520         heap_close(toastrel, AccessShareLock);
1521
1522         return result;
1523 }
1524
1525
1526 /* ----------
1527  * toast_fetch_datum -
1528  *
1529  *      Reconstruct an in memory Datum from the chunks saved
1530  *      in the toast relation
1531  * ----------
1532  */
1533 static struct varlena *
1534 toast_fetch_datum(struct varlena * attr)
1535 {
1536         Relation        toastrel;
1537         Relation        toastidx;
1538         ScanKeyData toastkey;
1539         SysScanDesc toastscan;
1540         HeapTuple       ttup;
1541         TupleDesc       toasttupDesc;
1542         struct varlena *result;
1543         struct varatt_external toast_pointer;
1544         int32           ressize;
1545         int32           residx,
1546                                 nextidx;
1547         int32           numchunks;
1548         Pointer         chunk;
1549         bool            isnull;
1550         char       *chunkdata;
1551         int32           chunksize;
1552
1553         /* Must copy to access aligned fields */
1554         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1555
1556         ressize = toast_pointer.va_extsize;
1557         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1558
1559         result = (struct varlena *) palloc(ressize + VARHDRSZ);
1560
1561         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1562                 SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1563         else
1564                 SET_VARSIZE(result, ressize + VARHDRSZ);
1565
1566         /*
1567          * Open the toast relation and its index
1568          */
1569         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1570         toasttupDesc = toastrel->rd_att;
1571         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1572
1573         /*
1574          * Setup a scan key to fetch from the index by va_valueid
1575          */
1576         ScanKeyInit(&toastkey,
1577                                 (AttrNumber) 1,
1578                                 BTEqualStrategyNumber, F_OIDEQ,
1579                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1580
1581         /*
1582          * Read the chunks by index
1583          *
1584          * Note that because the index is actually on (valueid, chunkidx) we will
1585          * see the chunks in chunkidx order, even though we didn't explicitly ask
1586          * for it.
1587          */
1588         nextidx = 0;
1589
1590         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1591                                                                                    SnapshotToast, 1, &toastkey);
1592         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1593         {
1594                 /*
1595                  * Have a chunk, extract the sequence number and the data
1596                  */
1597                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1598                 Assert(!isnull);
1599                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1600                 Assert(!isnull);
1601                 if (!VARATT_IS_EXTENDED(chunk))
1602                 {
1603                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1604                         chunkdata = VARDATA(chunk);
1605                 }
1606                 else if (VARATT_IS_SHORT(chunk))
1607                 {
1608                         /* could happen due to heap_form_tuple doing its thing */
1609                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1610                         chunkdata = VARDATA_SHORT(chunk);
1611                 }
1612                 else
1613                 {
1614                         /* should never happen */
1615                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1616                                  toast_pointer.va_valueid,
1617                                  RelationGetRelationName(toastrel));
1618                         chunksize = 0;          /* keep compiler quiet */
1619                         chunkdata = NULL;
1620                 }
1621
1622                 /*
1623                  * Some checks on the data we've found
1624                  */
1625                 if (residx != nextidx)
1626                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1627                                  residx, nextidx,
1628                                  toast_pointer.va_valueid,
1629                                  RelationGetRelationName(toastrel));
1630                 if (residx < numchunks - 1)
1631                 {
1632                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1633                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1634                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1635                                          residx, numchunks,
1636                                          toast_pointer.va_valueid,
1637                                          RelationGetRelationName(toastrel));
1638                 }
1639                 else if (residx == numchunks - 1)
1640                 {
1641                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1642                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
1643                                          chunksize,
1644                                          (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1645                                          residx,
1646                                          toast_pointer.va_valueid,
1647                                          RelationGetRelationName(toastrel));
1648                 }
1649                 else
1650                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1651                                  residx,
1652                                  0, numchunks - 1,
1653                                  toast_pointer.va_valueid,
1654                                  RelationGetRelationName(toastrel));
1655
1656                 /*
1657                  * Copy the data into proper place in our result
1658                  */
1659                 memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1660                            chunkdata,
1661                            chunksize);
1662
1663                 nextidx++;
1664         }
1665
1666         /*
1667          * Final checks that we successfully fetched the datum
1668          */
1669         if (nextidx != numchunks)
1670                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
1671                          nextidx,
1672                          toast_pointer.va_valueid,
1673                          RelationGetRelationName(toastrel));
1674
1675         /*
1676          * End scan and close relations
1677          */
1678         systable_endscan_ordered(toastscan);
1679         index_close(toastidx, AccessShareLock);
1680         heap_close(toastrel, AccessShareLock);
1681
1682         return result;
1683 }
1684
1685 /* ----------
1686  * toast_fetch_datum_slice -
1687  *
1688  *      Reconstruct a segment of a Datum from the chunks saved
1689  *      in the toast relation
1690  * ----------
1691  */
1692 static struct varlena *
1693 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1694 {
1695         Relation        toastrel;
1696         Relation        toastidx;
1697         ScanKeyData toastkey[3];
1698         int                     nscankeys;
1699         SysScanDesc toastscan;
1700         HeapTuple       ttup;
1701         TupleDesc       toasttupDesc;
1702         struct varlena *result;
1703         struct varatt_external toast_pointer;
1704         int32           attrsize;
1705         int32           residx;
1706         int32           nextidx;
1707         int                     numchunks;
1708         int                     startchunk;
1709         int                     endchunk;
1710         int32           startoffset;
1711         int32           endoffset;
1712         int                     totalchunks;
1713         Pointer         chunk;
1714         bool            isnull;
1715         char       *chunkdata;
1716         int32           chunksize;
1717         int32           chcpystrt;
1718         int32           chcpyend;
1719
1720         Assert(VARATT_IS_EXTERNAL(attr));
1721
1722         /* Must copy to access aligned fields */
1723         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1724
1725         /*
1726          * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
1727          * we can't return a compressed datum which is meaningful to toast later
1728          */
1729         Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1730
1731         attrsize = toast_pointer.va_extsize;
1732         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1733
1734         if (sliceoffset >= attrsize)
1735         {
1736                 sliceoffset = 0;
1737                 length = 0;
1738         }
1739
1740         if (((sliceoffset + length) > attrsize) || length < 0)
1741                 length = attrsize - sliceoffset;
1742
1743         result = (struct varlena *) palloc(length + VARHDRSZ);
1744
1745         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1746                 SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
1747         else
1748                 SET_VARSIZE(result, length + VARHDRSZ);
1749
1750         if (length == 0)
1751                 return result;                  /* Can save a lot of work at this point! */
1752
1753         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1754         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1755         numchunks = (endchunk - startchunk) + 1;
1756
1757         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1758         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1759
1760         /*
1761          * Open the toast relation and its index
1762          */
1763         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1764         toasttupDesc = toastrel->rd_att;
1765         toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1766
1767         /*
1768          * Setup a scan key to fetch from the index. This is either two keys or
1769          * three depending on the number of chunks.
1770          */
1771         ScanKeyInit(&toastkey[0],
1772                                 (AttrNumber) 1,
1773                                 BTEqualStrategyNumber, F_OIDEQ,
1774                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1775
1776         /*
1777          * Use equality condition for one chunk, a range condition otherwise:
1778          */
1779         if (numchunks == 1)
1780         {
1781                 ScanKeyInit(&toastkey[1],
1782                                         (AttrNumber) 2,
1783                                         BTEqualStrategyNumber, F_INT4EQ,
1784                                         Int32GetDatum(startchunk));
1785                 nscankeys = 2;
1786         }
1787         else
1788         {
1789                 ScanKeyInit(&toastkey[1],
1790                                         (AttrNumber) 2,
1791                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1792                                         Int32GetDatum(startchunk));
1793                 ScanKeyInit(&toastkey[2],
1794                                         (AttrNumber) 2,
1795                                         BTLessEqualStrategyNumber, F_INT4LE,
1796                                         Int32GetDatum(endchunk));
1797                 nscankeys = 3;
1798         }
1799
1800         /*
1801          * Read the chunks by index
1802          *
1803          * The index is on (valueid, chunkidx) so they will come in order
1804          */
1805         nextidx = startchunk;
1806         toastscan = systable_beginscan_ordered(toastrel, toastidx,
1807                                                                                  SnapshotToast, nscankeys, toastkey);
1808         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1809         {
1810                 /*
1811                  * Have a chunk, extract the sequence number and the data
1812                  */
1813                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1814                 Assert(!isnull);
1815                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1816                 Assert(!isnull);
1817                 if (!VARATT_IS_EXTENDED(chunk))
1818                 {
1819                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1820                         chunkdata = VARDATA(chunk);
1821                 }
1822                 else if (VARATT_IS_SHORT(chunk))
1823                 {
1824                         /* could happen due to heap_form_tuple doing its thing */
1825                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1826                         chunkdata = VARDATA_SHORT(chunk);
1827                 }
1828                 else
1829                 {
1830                         /* should never happen */
1831                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1832                                  toast_pointer.va_valueid,
1833                                  RelationGetRelationName(toastrel));
1834                         chunksize = 0;          /* keep compiler quiet */
1835                         chunkdata = NULL;
1836                 }
1837
1838                 /*
1839                  * Some checks on the data we've found
1840                  */
1841                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1842                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1843                                  residx, nextidx,
1844                                  toast_pointer.va_valueid,
1845                                  RelationGetRelationName(toastrel));
1846                 if (residx < totalchunks - 1)
1847                 {
1848                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1849                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
1850                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1851                                          residx, totalchunks,
1852                                          toast_pointer.va_valueid,
1853                                          RelationGetRelationName(toastrel));
1854                 }
1855                 else if (residx == totalchunks - 1)
1856                 {
1857                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1858                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
1859                                          chunksize,
1860                                          (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
1861                                          residx,
1862                                          toast_pointer.va_valueid,
1863                                          RelationGetRelationName(toastrel));
1864                 }
1865                 else
1866                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1867                                  residx,
1868                                  0, totalchunks - 1,
1869                                  toast_pointer.va_valueid,
1870                                  RelationGetRelationName(toastrel));
1871
1872                 /*
1873                  * Copy the data into proper place in our result
1874                  */
1875                 chcpystrt = 0;
1876                 chcpyend = chunksize - 1;
1877                 if (residx == startchunk)
1878                         chcpystrt = startoffset;
1879                 if (residx == endchunk)
1880                         chcpyend = endoffset;
1881
1882                 memcpy(VARDATA(result) +
1883                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1884                            chunkdata + chcpystrt,
1885                            (chcpyend - chcpystrt) + 1);
1886
1887                 nextidx++;
1888         }
1889
1890         /*
1891          * Final checks that we successfully fetched the datum
1892          */
1893         if (nextidx != (endchunk + 1))
1894                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
1895                          nextidx,
1896                          toast_pointer.va_valueid,
1897                          RelationGetRelationName(toastrel));
1898
1899         /*
1900          * End scan and close relations
1901          */
1902         systable_endscan_ordered(toastscan);
1903         index_close(toastidx, AccessShareLock);
1904         heap_close(toastrel, AccessShareLock);
1905
1906         return result;
1907 }