]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Update copyrights that were missed.
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2005, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.47 2005/01/01 05:43:06 momjian Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              heap_tuple_toast_attrs -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              heap_tuple_untoast_attr -
20  *                      Fetch back a given value from the "secondary" relation
21  *
22  *-------------------------------------------------------------------------
23  */
24
25 #include "postgres.h"
26
27 #include <unistd.h>
28 #include <fcntl.h>
29
30 #include "access/heapam.h"
31 #include "access/genam.h"
32 #include "access/tuptoaster.h"
33 #include "catalog/catalog.h"
34 #include "utils/rel.h"
35 #include "utils/builtins.h"
36 #include "utils/fmgroids.h"
37 #include "utils/pg_lzcompress.h"
38 #include "utils/typcache.h"
39
40
41 #undef TOAST_DEBUG
42
43 static void toast_delete(Relation rel, HeapTuple oldtup);
44 static void toast_delete_datum(Relation rel, Datum value);
45 static void toast_insert_or_update(Relation rel, HeapTuple newtup,
46                                            HeapTuple oldtup);
47 static Datum toast_save_datum(Relation rel, Datum value);
48 static varattrib *toast_fetch_datum(varattrib *attr);
49 static varattrib *toast_fetch_datum_slice(varattrib *attr,
50                                                 int32 sliceoffset, int32 length);
51
52
53 /* ----------
54  * heap_tuple_toast_attrs -
55  *
56  *      This is the central public entry point for toasting from heapam.
57  *
58  *      Calls the appropriate event specific action.
59  * ----------
60  */
61 void
62 heap_tuple_toast_attrs(Relation rel, HeapTuple newtup, HeapTuple oldtup)
63 {
64         if (newtup == NULL)
65                 toast_delete(rel, oldtup);
66         else
67                 toast_insert_or_update(rel, newtup, oldtup);
68 }
69
70
71 /* ----------
72  * heap_tuple_fetch_attr -
73  *
74  *      Public entry point to get back a toasted value
75  *      external storage (possibly still in compressed format).
76  * ----------
77  */
78 varattrib *
79 heap_tuple_fetch_attr(varattrib *attr)
80 {
81         varattrib  *result;
82
83         if (VARATT_IS_EXTERNAL(attr))
84         {
85                 /*
86                  * This is an external stored plain value
87                  */
88                 result = toast_fetch_datum(attr);
89         }
90         else
91         {
92                 /*
93                  * This is a plain value inside of the main tuple - why am I
94                  * called?
95                  */
96                 result = attr;
97         }
98
99         return result;
100 }
101
102
103 /* ----------
104  * heap_tuple_untoast_attr -
105  *
106  *      Public entry point to get back a toasted value from compression
107  *      or external storage.
108  * ----------
109  */
110 varattrib *
111 heap_tuple_untoast_attr(varattrib *attr)
112 {
113         varattrib  *result;
114
115         if (VARATT_IS_EXTERNAL(attr))
116         {
117                 if (VARATT_IS_COMPRESSED(attr))
118                 {
119                         /* ----------
120                          * This is an external stored compressed value
121                          * Fetch it from the toast heap and decompress.
122                          * ----------
123                          */
124                         varattrib  *tmp;
125
126                         tmp = toast_fetch_datum(attr);
127                         result = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
128                                                                                   + VARHDRSZ);
129                         VARATT_SIZEP(result) = attr->va_content.va_external.va_rawsize
130                                 + VARHDRSZ;
131                         pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(result));
132
133                         pfree(tmp);
134                 }
135                 else
136                 {
137                         /*
138                          * This is an external stored plain value
139                          */
140                         result = toast_fetch_datum(attr);
141                 }
142         }
143         else if (VARATT_IS_COMPRESSED(attr))
144         {
145                 /*
146                  * This is a compressed value inside of the main tuple
147                  */
148                 result = (varattrib *) palloc(attr->va_content.va_compressed.va_rawsize
149                                                                           + VARHDRSZ);
150                 VARATT_SIZEP(result) = attr->va_content.va_compressed.va_rawsize
151                         + VARHDRSZ;
152                 pglz_decompress((PGLZ_Header *) attr, VARATT_DATA(result));
153         }
154         else
155
156                 /*
157                  * This is a plain value inside of the main tuple - why am I
158                  * called?
159                  */
160                 return attr;
161
162         return result;
163 }
164
165
166 /* ----------
167  * heap_tuple_untoast_attr_slice -
168  *
169  *              Public entry point to get back part of a toasted value
170  *              from compression or external storage.
171  * ----------
172  */
173 varattrib *
174 heap_tuple_untoast_attr_slice(varattrib *attr, int32 sliceoffset, int32 slicelength)
175 {
176         varattrib  *preslice;
177         varattrib  *result;
178         int32           attrsize;
179
180         if (VARATT_IS_COMPRESSED(attr))
181         {
182                 varattrib  *tmp;
183
184                 if (VARATT_IS_EXTERNAL(attr))
185                         tmp = toast_fetch_datum(attr);
186                 else
187                 {
188                         tmp = attr;                     /* compressed in main tuple */
189                 }
190
191                 preslice = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
192                                                                                 + VARHDRSZ);
193                 VARATT_SIZEP(preslice) = attr->va_content.va_external.va_rawsize + VARHDRSZ;
194                 pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(preslice));
195
196                 if (tmp != attr)
197                         pfree(tmp);
198         }
199         else
200         {
201                 /* Plain value */
202                 if (VARATT_IS_EXTERNAL(attr))
203                 {
204                         /* fast path */
205                         return (toast_fetch_datum_slice(attr, sliceoffset, slicelength));
206                 }
207                 else
208                         preslice = attr;
209         }
210
211         /* slicing of datum for compressed cases and plain value */
212
213         attrsize = VARSIZE(preslice) - VARHDRSZ;
214         if (sliceoffset >= attrsize)
215         {
216                 sliceoffset = 0;
217                 slicelength = 0;
218         }
219
220         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
221                 slicelength = attrsize - sliceoffset;
222
223         result = (varattrib *) palloc(slicelength + VARHDRSZ);
224         VARATT_SIZEP(result) = slicelength + VARHDRSZ;
225
226         memcpy(VARDATA(result), VARDATA(preslice) + sliceoffset, slicelength);
227
228         if (preslice != attr)
229                 pfree(preslice);
230
231         return result;
232 }
233
234
235 /* ----------
236  * toast_raw_datum_size -
237  *
238  *      Return the raw (detoasted) size of a varlena datum
239  * ----------
240  */
241 Size
242 toast_raw_datum_size(Datum value)
243 {
244         varattrib  *attr = (varattrib *) DatumGetPointer(value);
245         Size            result;
246
247         if (VARATT_IS_COMPRESSED(attr))
248         {
249                 /*
250                  * va_rawsize shows the original data size, whether the datum is
251                  * external or not.
252                  */
253                 result = attr->va_content.va_compressed.va_rawsize + VARHDRSZ;
254         }
255         else if (VARATT_IS_EXTERNAL(attr))
256         {
257                 /*
258                  * an uncompressed external attribute has rawsize including the
259                  * header (not too consistent!)
260                  */
261                 result = attr->va_content.va_external.va_rawsize;
262         }
263         else
264         {
265                 /* plain untoasted datum */
266                 result = VARSIZE(attr);
267         }
268         return result;
269 }
270
271
272 /* ----------
273  * toast_delete -
274  *
275  *      Cascaded delete toast-entries on DELETE
276  * ----------
277  */
278 static void
279 toast_delete(Relation rel, HeapTuple oldtup)
280 {
281         TupleDesc       tupleDesc;
282         Form_pg_attribute *att;
283         int                     numAttrs;
284         int                     i;
285         Datum           toast_values[MaxHeapAttributeNumber];
286         char            toast_nulls[MaxHeapAttributeNumber];
287
288         /*
289          * Get the tuple descriptor and break down the tuple into fields.
290          *
291          * NOTE: it's debatable whether to use heap_deformtuple() here or just
292          * heap_getattr() only the varlena columns.  The latter could win if
293          * there are few varlena columns and many non-varlena ones. However,
294          * heap_deformtuple costs only O(N) while the heap_getattr way would
295          * cost O(N^2) if there are many varlena columns, so it seems better
296          * to err on the side of linear cost.  (We won't even be here unless
297          * there's at least one varlena column, by the way.)
298          */
299         tupleDesc = rel->rd_att;
300         att = tupleDesc->attrs;
301         numAttrs = tupleDesc->natts;
302
303         Assert(numAttrs <= MaxHeapAttributeNumber);
304         heap_deformtuple(oldtup, tupleDesc, toast_values, toast_nulls);
305
306         /*
307          * Check for external stored attributes and delete them from the
308          * secondary relation.
309          */
310         for (i = 0; i < numAttrs; i++)
311         {
312                 if (att[i]->attlen == -1)
313                 {
314                         Datum           value = toast_values[i];
315
316                         if (toast_nulls[i] != 'n' && VARATT_IS_EXTERNAL(value))
317                                 toast_delete_datum(rel, value);
318                 }
319         }
320 }
321
322
323 /* ----------
324  * toast_insert_or_update -
325  *
326  *      Delete no-longer-used toast-entries and create new ones to
327  *      make the new tuple fit on INSERT or UPDATE
328  * ----------
329  */
330 static void
331 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup)
332 {
333         TupleDesc       tupleDesc;
334         Form_pg_attribute *att;
335         int                     numAttrs;
336         int                     i;
337
338         bool            need_change = false;
339         bool            need_free = false;
340         bool            need_delold = false;
341         bool            has_nulls = false;
342
343         Size            maxDataLen;
344
345         char            toast_action[MaxHeapAttributeNumber];
346         char            toast_nulls[MaxHeapAttributeNumber];
347         char            toast_oldnulls[MaxHeapAttributeNumber];
348         Datum           toast_values[MaxHeapAttributeNumber];
349         Datum           toast_oldvalues[MaxHeapAttributeNumber];
350         int32           toast_sizes[MaxHeapAttributeNumber];
351         bool            toast_free[MaxHeapAttributeNumber];
352         bool            toast_delold[MaxHeapAttributeNumber];
353
354         /*
355          * Get the tuple descriptor and break down the tuple(s) into fields.
356          */
357         tupleDesc = rel->rd_att;
358         att = tupleDesc->attrs;
359         numAttrs = tupleDesc->natts;
360
361         Assert(numAttrs <= MaxHeapAttributeNumber);
362         heap_deformtuple(newtup, tupleDesc, toast_values, toast_nulls);
363         if (oldtup != NULL)
364                 heap_deformtuple(oldtup, tupleDesc, toast_oldvalues, toast_oldnulls);
365
366         /* ----------
367          * Then collect information about the values given
368          *
369          * NOTE: toast_action[i] can have these values:
370          *              ' '             default handling
371          *              'p'             already processed --- don't touch it
372          *              'x'             incompressible, but OK to move off
373          *
374          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
375          *              toast_action[i] different from 'p'.
376          * ----------
377          */
378         memset(toast_action, ' ', numAttrs * sizeof(char));
379         memset(toast_free, 0, numAttrs * sizeof(bool));
380         memset(toast_delold, 0, numAttrs * sizeof(bool));
381
382         for (i = 0; i < numAttrs; i++)
383         {
384                 varattrib  *old_value;
385                 varattrib  *new_value;
386
387                 if (oldtup != NULL)
388                 {
389                         /*
390                          * For UPDATE get the old and new values of this attribute
391                          */
392                         old_value = (varattrib *) DatumGetPointer(toast_oldvalues[i]);
393                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
394
395                         /*
396                          * If the old value is an external stored one, check if it has
397                          * changed so we have to delete it later.
398                          */
399                         if (att[i]->attlen == -1 && toast_oldnulls[i] != 'n' &&
400                                 VARATT_IS_EXTERNAL(old_value))
401                         {
402                                 if (toast_nulls[i] == 'n' || !VARATT_IS_EXTERNAL(new_value) ||
403                                         old_value->va_content.va_external.va_valueid !=
404                                         new_value->va_content.va_external.va_valueid ||
405                                         old_value->va_content.va_external.va_toastrelid !=
406                                         new_value->va_content.va_external.va_toastrelid)
407                                 {
408                                         /*
409                                          * The old external stored value isn't needed any more
410                                          * after the update
411                                          */
412                                         toast_delold[i] = true;
413                                         need_delold = true;
414                                 }
415                                 else
416                                 {
417                                         /*
418                                          * This attribute isn't changed by this update so we
419                                          * reuse the original reference to the old value in
420                                          * the new tuple.
421                                          */
422                                         toast_action[i] = 'p';
423                                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
424                                         continue;
425                                 }
426                         }
427                 }
428                 else
429                 {
430                         /*
431                          * For INSERT simply get the new value
432                          */
433                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
434                 }
435
436                 /*
437                  * Handle NULL attributes
438                  */
439                 if (toast_nulls[i] == 'n')
440                 {
441                         toast_action[i] = 'p';
442                         has_nulls = true;
443                         continue;
444                 }
445
446                 /*
447                  * Now look at varlena attributes
448                  */
449                 if (att[i]->attlen == -1)
450                 {
451                         /*
452                          * If the table's attribute says PLAIN always, force it so.
453                          */
454                         if (att[i]->attstorage == 'p')
455                                 toast_action[i] = 'p';
456
457                         /*
458                          * We took care of UPDATE above, so any external value we find
459                          * still in the tuple must be someone else's we cannot reuse.
460                          * Expand it to plain (and, probably, toast it again below).
461                          */
462                         if (VARATT_IS_EXTERNAL(new_value))
463                         {
464                                 new_value = heap_tuple_untoast_attr(new_value);
465                                 toast_values[i] = PointerGetDatum(new_value);
466                                 toast_free[i] = true;
467                                 need_change = true;
468                                 need_free = true;
469                         }
470
471                         /*
472                          * Remember the size of this attribute
473                          */
474                         toast_sizes[i] = VARATT_SIZE(new_value);
475                 }
476                 else
477                 {
478                         /*
479                          * Not a varlena attribute, plain storage always
480                          */
481                         toast_action[i] = 'p';
482                 }
483         }
484
485         /* ----------
486          * Compress and/or save external until data fits into target length
487          *
488          *      1: Inline compress attributes with attstorage 'x'
489          *      2: Store attributes with attstorage 'x' or 'e' external
490          *      3: Inline compress attributes with attstorage 'm'
491          *      4: Store attributes with attstorage 'm' external
492          * ----------
493          */
494         maxDataLen = offsetof(HeapTupleHeaderData, t_bits);
495         if (has_nulls)
496                 maxDataLen += BITMAPLEN(numAttrs);
497         maxDataLen = TOAST_TUPLE_TARGET - MAXALIGN(maxDataLen);
498
499         /*
500          * Look for attributes with attstorage 'x' to compress
501          */
502         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
503                    maxDataLen)
504         {
505                 int                     biggest_attno = -1;
506                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
507                 Datum           old_value;
508                 Datum           new_value;
509
510                 /*
511                  * Search for the biggest yet uncompressed internal attribute
512                  */
513                 for (i = 0; i < numAttrs; i++)
514                 {
515                         if (toast_action[i] != ' ')
516                                 continue;
517                         if (VARATT_IS_EXTENDED(toast_values[i]))
518                                 continue;
519                         if (att[i]->attstorage != 'x')
520                                 continue;
521                         if (toast_sizes[i] > biggest_size)
522                         {
523                                 biggest_attno = i;
524                                 biggest_size = toast_sizes[i];
525                         }
526                 }
527
528                 if (biggest_attno < 0)
529                         break;
530
531                 /*
532                  * Attempt to compress it inline
533                  */
534                 i = biggest_attno;
535                 old_value = toast_values[i];
536                 new_value = toast_compress_datum(old_value);
537
538                 if (DatumGetPointer(new_value) != NULL)
539                 {
540                         /* successful compression */
541                         if (toast_free[i])
542                                 pfree(DatumGetPointer(old_value));
543                         toast_values[i] = new_value;
544                         toast_free[i] = true;
545                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
546                         need_change = true;
547                         need_free = true;
548                 }
549                 else
550                 {
551                         /*
552                          * incompressible data, ignore on subsequent compression
553                          * passes
554                          */
555                         toast_action[i] = 'x';
556                 }
557         }
558
559         /*
560          * Second we look for attributes of attstorage 'x' or 'e' that are
561          * still inline.
562          */
563         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
564                    maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
565         {
566                 int                     biggest_attno = -1;
567                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
568                 Datum           old_value;
569
570                 /*------
571                  * Search for the biggest yet inlined attribute with
572                  * attstorage equals 'x' or 'e'
573                  *------
574                  */
575                 for (i = 0; i < numAttrs; i++)
576                 {
577                         if (toast_action[i] == 'p')
578                                 continue;
579                         if (VARATT_IS_EXTERNAL(toast_values[i]))
580                                 continue;
581                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
582                                 continue;
583                         if (toast_sizes[i] > biggest_size)
584                         {
585                                 biggest_attno = i;
586                                 biggest_size = toast_sizes[i];
587                         }
588                 }
589
590                 if (biggest_attno < 0)
591                         break;
592
593                 /*
594                  * Store this external
595                  */
596                 i = biggest_attno;
597                 old_value = toast_values[i];
598                 toast_action[i] = 'p';
599                 toast_values[i] = toast_save_datum(rel, toast_values[i]);
600                 if (toast_free[i])
601                         pfree(DatumGetPointer(old_value));
602
603                 toast_free[i] = true;
604                 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
605
606                 need_change = true;
607                 need_free = true;
608         }
609
610         /*
611          * Round 3 - this time we take attributes with storage 'm' into
612          * compression
613          */
614         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
615                    maxDataLen)
616         {
617                 int                     biggest_attno = -1;
618                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
619                 Datum           old_value;
620                 Datum           new_value;
621
622                 /*
623                  * Search for the biggest yet uncompressed internal attribute
624                  */
625                 for (i = 0; i < numAttrs; i++)
626                 {
627                         if (toast_action[i] != ' ')
628                                 continue;
629                         if (VARATT_IS_EXTENDED(toast_values[i]))
630                                 continue;
631                         if (att[i]->attstorage != 'm')
632                                 continue;
633                         if (toast_sizes[i] > biggest_size)
634                         {
635                                 biggest_attno = i;
636                                 biggest_size = toast_sizes[i];
637                         }
638                 }
639
640                 if (biggest_attno < 0)
641                         break;
642
643                 /*
644                  * Attempt to compress it inline
645                  */
646                 i = biggest_attno;
647                 old_value = toast_values[i];
648                 new_value = toast_compress_datum(old_value);
649
650                 if (DatumGetPointer(new_value) != NULL)
651                 {
652                         /* successful compression */
653                         if (toast_free[i])
654                                 pfree(DatumGetPointer(old_value));
655                         toast_values[i] = new_value;
656                         toast_free[i] = true;
657                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
658                         need_change = true;
659                         need_free = true;
660                 }
661                 else
662                 {
663                         /*
664                          * incompressible data, ignore on subsequent compression
665                          * passes
666                          */
667                         toast_action[i] = 'x';
668                 }
669         }
670
671         /*
672          * Finally we store attributes of type 'm' external
673          */
674         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
675                    maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
676         {
677                 int                     biggest_attno = -1;
678                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
679                 Datum           old_value;
680
681                 /*--------
682                  * Search for the biggest yet inlined attribute with
683                  * attstorage = 'm'
684                  *--------
685                  */
686                 for (i = 0; i < numAttrs; i++)
687                 {
688                         if (toast_action[i] == 'p')
689                                 continue;
690                         if (VARATT_IS_EXTERNAL(toast_values[i]))
691                                 continue;
692                         if (att[i]->attstorage != 'm')
693                                 continue;
694                         if (toast_sizes[i] > biggest_size)
695                         {
696                                 biggest_attno = i;
697                                 biggest_size = toast_sizes[i];
698                         }
699                 }
700
701                 if (biggest_attno < 0)
702                         break;
703
704                 /*
705                  * Store this external
706                  */
707                 i = biggest_attno;
708                 old_value = toast_values[i];
709                 toast_action[i] = 'p';
710                 toast_values[i] = toast_save_datum(rel, toast_values[i]);
711                 if (toast_free[i])
712                         pfree(DatumGetPointer(old_value));
713
714                 toast_free[i] = true;
715                 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
716
717                 need_change = true;
718                 need_free = true;
719         }
720
721         /*
722          * In the case we toasted any values, we need to build a new heap
723          * tuple with the changed values.
724          */
725         if (need_change)
726         {
727                 HeapTupleHeader olddata = newtup->t_data;
728                 char       *new_data;
729                 int32           new_len;
730
731                 /*
732                  * Calculate the new size of the tuple.  Header size should not
733                  * change, but data size might.
734                  */
735                 new_len = offsetof(HeapTupleHeaderData, t_bits);
736                 if (has_nulls)
737                         new_len += BITMAPLEN(numAttrs);
738                 if (olddata->t_infomask & HEAP_HASOID)
739                         new_len += sizeof(Oid);
740                 new_len = MAXALIGN(new_len);
741                 Assert(new_len == olddata->t_hoff);
742                 new_len += ComputeDataSize(tupleDesc, toast_values, toast_nulls);
743
744                 /*
745                  * Allocate new tuple in same context as old one.
746                  */
747                 new_data = (char *) MemoryContextAlloc(newtup->t_datamcxt, new_len);
748                 newtup->t_data = (HeapTupleHeader) new_data;
749                 newtup->t_len = new_len;
750
751                 /*
752                  * Put the tuple header and the changed values into place
753                  */
754                 memcpy(new_data, olddata, olddata->t_hoff);
755
756                 DataFill((char *) new_data + olddata->t_hoff,
757                                  tupleDesc,
758                                  toast_values,
759                                  toast_nulls,
760                                  &(newtup->t_data->t_infomask),
761                                  has_nulls ? newtup->t_data->t_bits : NULL);
762
763                 /*
764                  * In the case we modified a previously modified tuple again, free
765                  * the memory from the previous run
766                  */
767                 if ((char *) olddata != ((char *) newtup + HEAPTUPLESIZE))
768                         pfree(olddata);
769         }
770
771         /*
772          * Free allocated temp values
773          */
774         if (need_free)
775                 for (i = 0; i < numAttrs; i++)
776                         if (toast_free[i])
777                                 pfree(DatumGetPointer(toast_values[i]));
778
779         /*
780          * Delete external values from the old tuple
781          */
782         if (need_delold)
783                 for (i = 0; i < numAttrs; i++)
784                         if (toast_delold[i])
785                                 toast_delete_datum(rel, toast_oldvalues[i]);
786 }
787
788
789 /* ----------
790  * toast_flatten_tuple_attribute -
791  *
792  *      If a Datum is of composite type, "flatten" it to contain no toasted fields.
793  *      This must be invoked on any potentially-composite field that is to be
794  *      inserted into a tuple.  Doing this preserves the invariant that toasting
795  *      goes only one level deep in a tuple.
796  * ----------
797  */
798 Datum
799 toast_flatten_tuple_attribute(Datum value,
800                                                           Oid typeId, int32 typeMod)
801 {
802         TupleDesc       tupleDesc;
803         HeapTupleHeader olddata;
804         HeapTupleHeader new_data;
805         int32           new_len;
806         HeapTupleData tmptup;
807         Form_pg_attribute *att;
808         int                     numAttrs;
809         int                     i;
810         bool            need_change = false;
811         bool            has_nulls = false;
812         Datum           toast_values[MaxTupleAttributeNumber];
813         char            toast_nulls[MaxTupleAttributeNumber];
814         bool            toast_free[MaxTupleAttributeNumber];
815
816         /*
817          * See if it's a composite type, and get the tupdesc if so.
818          */
819         tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
820         if (tupleDesc == NULL)
821                 return value;                   /* not a composite type */
822
823         att = tupleDesc->attrs;
824         numAttrs = tupleDesc->natts;
825
826         /*
827          * Break down the tuple into fields.
828          */
829         olddata = DatumGetHeapTupleHeader(value);
830         Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
831         Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
832         /* Build a temporary HeapTuple control structure */
833         tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
834         ItemPointerSetInvalid(&(tmptup.t_self));
835         tmptup.t_tableOid = InvalidOid;
836         tmptup.t_data = olddata;
837
838         Assert(numAttrs <= MaxTupleAttributeNumber);
839         heap_deformtuple(&tmptup, tupleDesc, toast_values, toast_nulls);
840
841         memset(toast_free, 0, numAttrs * sizeof(bool));
842
843         for (i = 0; i < numAttrs; i++)
844         {
845                 /*
846                  * Look at non-null varlena attributes
847                  */
848                 if (toast_nulls[i] == 'n')
849                         has_nulls = true;
850                 else if (att[i]->attlen == -1)
851                 {
852                         varattrib  *new_value;
853
854                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
855                         if (VARATT_IS_EXTENDED(new_value))
856                         {
857                                 new_value = heap_tuple_untoast_attr(new_value);
858                                 toast_values[i] = PointerGetDatum(new_value);
859                                 toast_free[i] = true;
860                                 need_change = true;
861                         }
862                 }
863         }
864
865         /*
866          * If nothing to untoast, just return the original tuple.
867          */
868         if (!need_change)
869                 return value;
870
871         /*
872          * Calculate the new size of the tuple.  Header size should not
873          * change, but data size might.
874          */
875         new_len = offsetof(HeapTupleHeaderData, t_bits);
876         if (has_nulls)
877                 new_len += BITMAPLEN(numAttrs);
878         if (olddata->t_infomask & HEAP_HASOID)
879                 new_len += sizeof(Oid);
880         new_len = MAXALIGN(new_len);
881         Assert(new_len == olddata->t_hoff);
882         new_len += ComputeDataSize(tupleDesc, toast_values, toast_nulls);
883
884         new_data = (HeapTupleHeader) palloc0(new_len);
885
886         /*
887          * Put the tuple header and the changed values into place
888          */
889         memcpy(new_data, olddata, olddata->t_hoff);
890
891         HeapTupleHeaderSetDatumLength(new_data, new_len);
892
893         DataFill((char *) new_data + olddata->t_hoff,
894                          tupleDesc,
895                          toast_values,
896                          toast_nulls,
897                          &(new_data->t_infomask),
898                          has_nulls ? new_data->t_bits : NULL);
899
900         /*
901          * Free allocated temp values
902          */
903         for (i = 0; i < numAttrs; i++)
904                 if (toast_free[i])
905                         pfree(DatumGetPointer(toast_values[i]));
906
907         return PointerGetDatum(new_data);
908 }
909
910
911 /* ----------
912  * toast_compress_datum -
913  *
914  *      Create a compressed version of a varlena datum
915  *
916  *      If we fail (ie, compressed result is actually bigger than original)
917  *      then return NULL.  We must not use compressed data if it'd expand
918  *      the tuple!
919  * ----------
920  */
921 Datum
922 toast_compress_datum(Datum value)
923 {
924         varattrib  *tmp;
925
926         tmp = (varattrib *) palloc(sizeof(PGLZ_Header) + VARATT_SIZE(value));
927         pglz_compress(VARATT_DATA(value), VARATT_SIZE(value) - VARHDRSZ,
928                                   (PGLZ_Header *) tmp,
929                                   PGLZ_strategy_default);
930         if (VARATT_SIZE(tmp) < VARATT_SIZE(value))
931         {
932                 /* successful compression */
933                 VARATT_SIZEP(tmp) |= VARATT_FLAG_COMPRESSED;
934                 return PointerGetDatum(tmp);
935         }
936         else
937         {
938                 /* incompressible data */
939                 pfree(tmp);
940                 return PointerGetDatum(NULL);
941         }
942 }
943
944
945 /* ----------
946  * toast_save_datum -
947  *
948  *      Save one single datum into the secondary relation and return
949  *      a varattrib reference for it.
950  * ----------
951  */
952 static Datum
953 toast_save_datum(Relation rel, Datum value)
954 {
955         Relation        toastrel;
956         Relation        toastidx;
957         HeapTuple       toasttup;
958         InsertIndexResult idxres;
959         TupleDesc       toasttupDesc;
960         Datum           t_values[3];
961         char            t_nulls[3];
962         varattrib  *result;
963         struct
964         {
965                 struct varlena hdr;
966                 char            data[TOAST_MAX_CHUNK_SIZE];
967         }                       chunk_data;
968         int32           chunk_size;
969         int32           chunk_seq = 0;
970         char       *data_p;
971         int32           data_todo;
972
973         /*
974          * Create the varattrib reference
975          */
976         result = (varattrib *) palloc(sizeof(varattrib));
977
978         result->va_header = sizeof(varattrib) | VARATT_FLAG_EXTERNAL;
979         if (VARATT_IS_COMPRESSED(value))
980         {
981                 result->va_header |= VARATT_FLAG_COMPRESSED;
982                 result->va_content.va_external.va_rawsize =
983                         ((varattrib *) value)->va_content.va_compressed.va_rawsize;
984         }
985         else
986                 result->va_content.va_external.va_rawsize = VARATT_SIZE(value);
987
988         result->va_content.va_external.va_extsize =
989                 VARATT_SIZE(value) - VARHDRSZ;
990         result->va_content.va_external.va_valueid = newoid();
991         result->va_content.va_external.va_toastrelid =
992                 rel->rd_rel->reltoastrelid;
993
994         /*
995          * Initialize constant parts of the tuple data
996          */
997         t_values[0] = ObjectIdGetDatum(result->va_content.va_external.va_valueid);
998         t_values[2] = PointerGetDatum(&chunk_data);
999         t_nulls[0] = ' ';
1000         t_nulls[1] = ' ';
1001         t_nulls[2] = ' ';
1002
1003         /*
1004          * Get the data to process
1005          */
1006         data_p = VARATT_DATA(value);
1007         data_todo = VARATT_SIZE(value) - VARHDRSZ;
1008
1009         /*
1010          * Open the toast relation.  We must explicitly lock the toast index
1011          * because we aren't using an index scan here.
1012          */
1013         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1014         toasttupDesc = toastrel->rd_att;
1015         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1016         LockRelation(toastidx, RowExclusiveLock);
1017
1018         /*
1019          * Split up the item into chunks
1020          */
1021         while (data_todo > 0)
1022         {
1023                 /*
1024                  * Calculate the size of this chunk
1025                  */
1026                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1027
1028                 /*
1029                  * Build a tuple and store it
1030                  */
1031                 t_values[1] = Int32GetDatum(chunk_seq++);
1032                 VARATT_SIZEP(&chunk_data) = chunk_size + VARHDRSZ;
1033                 memcpy(VARATT_DATA(&chunk_data), data_p, chunk_size);
1034                 toasttup = heap_formtuple(toasttupDesc, t_values, t_nulls);
1035                 if (!HeapTupleIsValid(toasttup))
1036                         elog(ERROR, "failed to build TOAST tuple");
1037
1038                 simple_heap_insert(toastrel, toasttup);
1039
1040                 /*
1041                  * Create the index entry.      We cheat a little here by not using
1042                  * FormIndexDatum: this relies on the knowledge that the index
1043                  * columns are the same as the initial columns of the table.
1044                  *
1045                  * Note also that there had better not be any user-created index on
1046                  * the TOAST table, since we don't bother to update anything else.
1047                  */
1048                 idxres = index_insert(toastidx, t_values, t_nulls,
1049                                                           &(toasttup->t_self),
1050                                                           toastrel, toastidx->rd_index->indisunique);
1051                 if (idxres == NULL)
1052                         elog(ERROR, "failed to insert index entry for TOAST tuple");
1053
1054                 /*
1055                  * Free memory
1056                  */
1057                 pfree(idxres);
1058                 heap_freetuple(toasttup);
1059
1060                 /*
1061                  * Move on to next chunk
1062                  */
1063                 data_todo -= chunk_size;
1064                 data_p += chunk_size;
1065         }
1066
1067         /*
1068          * Done - close toast relation and return the reference
1069          */
1070         UnlockRelation(toastidx, RowExclusiveLock);
1071         index_close(toastidx);
1072         heap_close(toastrel, RowExclusiveLock);
1073
1074         return PointerGetDatum(result);
1075 }
1076
1077
1078 /* ----------
1079  * toast_delete_datum -
1080  *
1081  *      Delete a single external stored value.
1082  * ----------
1083  */
1084 static void
1085 toast_delete_datum(Relation rel, Datum value)
1086 {
1087         varattrib  *attr = (varattrib *) DatumGetPointer(value);
1088         Relation        toastrel;
1089         Relation        toastidx;
1090         ScanKeyData toastkey;
1091         IndexScanDesc toastscan;
1092         HeapTuple       toasttup;
1093
1094         if (!VARATT_IS_EXTERNAL(attr))
1095                 return;
1096
1097         /*
1098          * Open the toast relation and it's index
1099          */
1100         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1101                                                  RowExclusiveLock);
1102         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1103
1104         /*
1105          * Setup a scan key to fetch from the index by va_valueid (we don't
1106          * particularly care whether we see them in sequence or not)
1107          */
1108         ScanKeyInit(&toastkey,
1109                                 (AttrNumber) 1,
1110                                 BTEqualStrategyNumber, F_OIDEQ,
1111                           ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1112
1113         /*
1114          * Find the chunks by index
1115          */
1116         toastscan = index_beginscan(toastrel, toastidx, SnapshotToast,
1117                                                                 1, &toastkey);
1118         while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1119         {
1120                 /*
1121                  * Have a chunk, delete it
1122                  */
1123                 simple_heap_delete(toastrel, &toasttup->t_self);
1124         }
1125
1126         /*
1127          * End scan and close relations
1128          */
1129         index_endscan(toastscan);
1130         index_close(toastidx);
1131         heap_close(toastrel, RowExclusiveLock);
1132 }
1133
1134
1135 /* ----------
1136  * toast_fetch_datum -
1137  *
1138  *      Reconstruct an in memory varattrib from the chunks saved
1139  *      in the toast relation
1140  * ----------
1141  */
1142 static varattrib *
1143 toast_fetch_datum(varattrib *attr)
1144 {
1145         Relation        toastrel;
1146         Relation        toastidx;
1147         ScanKeyData toastkey;
1148         IndexScanDesc toastscan;
1149         HeapTuple       ttup;
1150         TupleDesc       toasttupDesc;
1151         varattrib  *result;
1152         int32           ressize;
1153         int32           residx,
1154                                 nextidx;
1155         int32           numchunks;
1156         Pointer         chunk;
1157         bool            isnull;
1158         int32           chunksize;
1159
1160         ressize = attr->va_content.va_external.va_extsize;
1161         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1162
1163         result = (varattrib *) palloc(ressize + VARHDRSZ);
1164         VARATT_SIZEP(result) = ressize + VARHDRSZ;
1165         if (VARATT_IS_COMPRESSED(attr))
1166                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1167
1168         /*
1169          * Open the toast relation and its index
1170          */
1171         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1172                                                  AccessShareLock);
1173         toasttupDesc = toastrel->rd_att;
1174         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1175
1176         /*
1177          * Setup a scan key to fetch from the index by va_valueid
1178          */
1179         ScanKeyInit(&toastkey,
1180                                 (AttrNumber) 1,
1181                                 BTEqualStrategyNumber, F_OIDEQ,
1182                           ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1183
1184         /*
1185          * Read the chunks by index
1186          *
1187          * Note that because the index is actually on (valueid, chunkidx) we will
1188          * see the chunks in chunkidx order, even though we didn't explicitly
1189          * ask for it.
1190          */
1191         nextidx = 0;
1192
1193         toastscan = index_beginscan(toastrel, toastidx, SnapshotToast,
1194                                                                 1, &toastkey);
1195         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1196         {
1197                 /*
1198                  * Have a chunk, extract the sequence number and the data
1199                  */
1200                 residx = DatumGetInt32(heap_getattr(ttup, 2, toasttupDesc, &isnull));
1201                 Assert(!isnull);
1202                 chunk = DatumGetPointer(heap_getattr(ttup, 3, toasttupDesc, &isnull));
1203                 Assert(!isnull);
1204                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1205
1206                 /*
1207                  * Some checks on the data we've found
1208                  */
1209                 if (residx != nextidx)
1210                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1211                                  residx, nextidx,
1212                                  attr->va_content.va_external.va_valueid);
1213                 if (residx < numchunks - 1)
1214                 {
1215                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1216                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1217                                          chunksize, residx,
1218                                          attr->va_content.va_external.va_valueid);
1219                 }
1220                 else if (residx < numchunks)
1221                 {
1222                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1223                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1224                                          chunksize, residx,
1225                                          attr->va_content.va_external.va_valueid);
1226                 }
1227                 else
1228                         elog(ERROR, "unexpected chunk number %d for toast value %u",
1229                                  residx,
1230                                  attr->va_content.va_external.va_valueid);
1231
1232                 /*
1233                  * Copy the data into proper place in our result
1234                  */
1235                 memcpy(((char *) VARATT_DATA(result)) + residx * TOAST_MAX_CHUNK_SIZE,
1236                            VARATT_DATA(chunk),
1237                            chunksize);
1238
1239                 nextidx++;
1240         }
1241
1242         /*
1243          * Final checks that we successfully fetched the datum
1244          */
1245         if (nextidx != numchunks)
1246                 elog(ERROR, "missing chunk number %d for toast value %u",
1247                          nextidx,
1248                          attr->va_content.va_external.va_valueid);
1249
1250         /*
1251          * End scan and close relations
1252          */
1253         index_endscan(toastscan);
1254         index_close(toastidx);
1255         heap_close(toastrel, AccessShareLock);
1256
1257         return result;
1258 }
1259
1260 /* ----------
1261  * toast_fetch_datum_slice -
1262  *
1263  *      Reconstruct a segment of a varattrib from the chunks saved
1264  *      in the toast relation
1265  * ----------
1266  */
1267 static varattrib *
1268 toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
1269 {
1270         Relation        toastrel;
1271         Relation        toastidx;
1272         ScanKeyData toastkey[3];
1273         int                     nscankeys;
1274         IndexScanDesc toastscan;
1275         HeapTuple       ttup;
1276         TupleDesc       toasttupDesc;
1277         varattrib  *result;
1278         int32           attrsize;
1279         int32           residx;
1280         int32           nextidx;
1281         int                     numchunks;
1282         int                     startchunk;
1283         int                     endchunk;
1284         int32           startoffset;
1285         int32           endoffset;
1286         int                     totalchunks;
1287         Pointer         chunk;
1288         bool            isnull;
1289         int32           chunksize;
1290         int32           chcpystrt;
1291         int32           chcpyend;
1292
1293         attrsize = attr->va_content.va_external.va_extsize;
1294         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1295
1296         if (sliceoffset >= attrsize)
1297         {
1298                 sliceoffset = 0;
1299                 length = 0;
1300         }
1301
1302         if (((sliceoffset + length) > attrsize) || length < 0)
1303                 length = attrsize - sliceoffset;
1304
1305         result = (varattrib *) palloc(length + VARHDRSZ);
1306         VARATT_SIZEP(result) = length + VARHDRSZ;
1307
1308         if (VARATT_IS_COMPRESSED(attr))
1309                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1310
1311         if (length == 0)
1312                 return (result);                /* Can save a lot of work at this point! */
1313
1314         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1315         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1316         numchunks = (endchunk - startchunk) + 1;
1317
1318         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1319         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1320
1321         /*
1322          * Open the toast relation and it's index
1323          */
1324         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1325                                                  AccessShareLock);
1326         toasttupDesc = toastrel->rd_att;
1327         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1328
1329         /*
1330          * Setup a scan key to fetch from the index. This is either two keys
1331          * or three depending on the number of chunks.
1332          */
1333         ScanKeyInit(&toastkey[0],
1334                                 (AttrNumber) 1,
1335                                 BTEqualStrategyNumber, F_OIDEQ,
1336                           ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1337
1338         /*
1339          * Use equality condition for one chunk, a range condition otherwise:
1340          */
1341         if (numchunks == 1)
1342         {
1343                 ScanKeyInit(&toastkey[1],
1344                                         (AttrNumber) 2,
1345                                         BTEqualStrategyNumber, F_INT4EQ,
1346                                         Int32GetDatum(startchunk));
1347                 nscankeys = 2;
1348         }
1349         else
1350         {
1351                 ScanKeyInit(&toastkey[1],
1352                                         (AttrNumber) 2,
1353                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1354                                         Int32GetDatum(startchunk));
1355                 ScanKeyInit(&toastkey[2],
1356                                         (AttrNumber) 2,
1357                                         BTLessEqualStrategyNumber, F_INT4LE,
1358                                         Int32GetDatum(endchunk));
1359                 nscankeys = 3;
1360         }
1361
1362         /*
1363          * Read the chunks by index
1364          *
1365          * The index is on (valueid, chunkidx) so they will come in order
1366          */
1367         nextidx = startchunk;
1368         toastscan = index_beginscan(toastrel, toastidx, SnapshotToast,
1369                                                                 nscankeys, toastkey);
1370         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1371         {
1372                 /*
1373                  * Have a chunk, extract the sequence number and the data
1374                  */
1375                 residx = DatumGetInt32(heap_getattr(ttup, 2, toasttupDesc, &isnull));
1376                 Assert(!isnull);
1377                 chunk = DatumGetPointer(heap_getattr(ttup, 3, toasttupDesc, &isnull));
1378                 Assert(!isnull);
1379                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1380
1381                 /*
1382                  * Some checks on the data we've found
1383                  */
1384                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1385                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1386                                  residx, nextidx,
1387                                  attr->va_content.va_external.va_valueid);
1388                 if (residx < totalchunks - 1)
1389                 {
1390                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1391                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1392                                          chunksize, residx,
1393                                          attr->va_content.va_external.va_valueid);
1394                 }
1395                 else
1396                 {
1397                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1398                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1399                                          chunksize, residx,
1400                                          attr->va_content.va_external.va_valueid);
1401                 }
1402
1403                 /*
1404                  * Copy the data into proper place in our result
1405                  */
1406                 chcpystrt = 0;
1407                 chcpyend = chunksize - 1;
1408                 if (residx == startchunk)
1409                         chcpystrt = startoffset;
1410                 if (residx == endchunk)
1411                         chcpyend = endoffset;
1412
1413                 memcpy(((char *) VARATT_DATA(result)) +
1414                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1415                            VARATT_DATA(chunk) + chcpystrt,
1416                            (chcpyend - chcpystrt) + 1);
1417
1418                 nextidx++;
1419         }
1420
1421         /*
1422          * Final checks that we successfully fetched the datum
1423          */
1424         if (nextidx != (endchunk + 1))
1425                 elog(ERROR, "missing chunk number %d for toast value %u",
1426                          nextidx,
1427                          attr->va_content.va_external.va_valueid);
1428
1429         /*
1430          * End scan and close relations
1431          */
1432         index_endscan(toastscan);
1433         index_close(toastidx);
1434         heap_close(toastrel, AccessShareLock);
1435
1436         return result;
1437 }