]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Fix some copyright notices that weren't updated. Improve copyright tool
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2003, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/heap/tuptoaster.c,v 1.38 2003/08/04 23:59:37 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              heap_tuple_toast_attrs -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              heap_tuple_untoast_attr -
20  *                      Fetch back a given value from the "secondary" relation
21  *
22  *-------------------------------------------------------------------------
23  */
24
25 #include "postgres.h"
26
27 #include <unistd.h>
28 #include <fcntl.h>
29
30 #include "access/heapam.h"
31 #include "access/genam.h"
32 #include "access/tuptoaster.h"
33 #include "catalog/catalog.h"
34 #include "utils/rel.h"
35 #include "utils/builtins.h"
36 #include "utils/fmgroids.h"
37 #include "utils/pg_lzcompress.h"
38
39
40 #undef TOAST_DEBUG
41
42 static void toast_delete(Relation rel, HeapTuple oldtup);
43 static void toast_delete_datum(Relation rel, Datum value);
44 static void toast_insert_or_update(Relation rel, HeapTuple newtup,
45                                            HeapTuple oldtup);
46 static Datum toast_save_datum(Relation rel, Datum value);
47 static varattrib *toast_fetch_datum(varattrib *attr);
48 static varattrib *toast_fetch_datum_slice(varattrib *attr,
49                                                 int32 sliceoffset, int32 length);
50
51
52 /* ----------
53  * heap_tuple_toast_attrs -
54  *
55  *      This is the central public entry point for toasting from heapam.
56  *
57  *      Calls the appropriate event specific action.
58  * ----------
59  */
60 void
61 heap_tuple_toast_attrs(Relation rel, HeapTuple newtup, HeapTuple oldtup)
62 {
63         if (newtup == NULL)
64                 toast_delete(rel, oldtup);
65         else
66                 toast_insert_or_update(rel, newtup, oldtup);
67 }
68
69
70 /* ----------
71  * heap_tuple_fetch_attr -
72  *
73  *      Public entry point to get back a toasted value
74  *      external storage (possibly still in compressed format).
75  * ----------
76  */
77 varattrib *
78 heap_tuple_fetch_attr(varattrib *attr)
79 {
80         varattrib  *result;
81
82         if (VARATT_IS_EXTERNAL(attr))
83         {
84                 /*
85                  * This is an external stored plain value
86                  */
87                 result = toast_fetch_datum(attr);
88         }
89         else
90         {
91                 /*
92                  * This is a plain value inside of the main tuple - why am I
93                  * called?
94                  */
95                 result = attr;
96         }
97
98         return result;
99 }
100
101
102 /* ----------
103  * heap_tuple_untoast_attr -
104  *
105  *      Public entry point to get back a toasted value from compression
106  *      or external storage.
107  * ----------
108  */
109 varattrib *
110 heap_tuple_untoast_attr(varattrib *attr)
111 {
112         varattrib  *result;
113
114         if (VARATT_IS_EXTERNAL(attr))
115         {
116                 if (VARATT_IS_COMPRESSED(attr))
117                 {
118                         /* ----------
119                          * This is an external stored compressed value
120                          * Fetch it from the toast heap and decompress.
121                          * ----------
122                          */
123                         varattrib  *tmp;
124
125                         tmp = toast_fetch_datum(attr);
126                         result = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
127                                                                                   + VARHDRSZ);
128                         VARATT_SIZEP(result) = attr->va_content.va_external.va_rawsize
129                                 + VARHDRSZ;
130                         pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(result));
131
132                         pfree(tmp);
133                 }
134                 else
135                 {
136                         /*
137                          * This is an external stored plain value
138                          */
139                         result = toast_fetch_datum(attr);
140                 }
141         }
142         else if (VARATT_IS_COMPRESSED(attr))
143         {
144                 /*
145                  * This is a compressed value inside of the main tuple
146                  */
147                 result = (varattrib *) palloc(attr->va_content.va_compressed.va_rawsize
148                                                                           + VARHDRSZ);
149                 VARATT_SIZEP(result) = attr->va_content.va_compressed.va_rawsize
150                         + VARHDRSZ;
151                 pglz_decompress((PGLZ_Header *) attr, VARATT_DATA(result));
152         }
153         else
154
155                 /*
156                  * This is a plain value inside of the main tuple - why am I
157                  * called?
158                  */
159                 return attr;
160
161         return result;
162 }
163
164
165 /* ----------
166  * heap_tuple_untoast_attr_slice -
167  *
168  *              Public entry point to get back part of a toasted value
169  *              from compression or external storage.
170  * ----------
171  */
172 varattrib *
173 heap_tuple_untoast_attr_slice(varattrib *attr, int32 sliceoffset, int32 slicelength)
174 {
175         varattrib  *preslice;
176         varattrib  *result;
177         int32           attrsize;
178
179         if (VARATT_IS_COMPRESSED(attr))
180         {
181                 varattrib  *tmp;
182
183                 if (VARATT_IS_EXTERNAL(attr))
184                         tmp = toast_fetch_datum(attr);
185                 else
186                 {
187                         tmp = attr;                     /* compressed in main tuple */
188                 }
189
190                 preslice = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
191                                                                                 + VARHDRSZ);
192                 VARATT_SIZEP(preslice) = attr->va_content.va_external.va_rawsize + VARHDRSZ;
193                 pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(preslice));
194
195                 if (tmp != attr)
196                         pfree(tmp);
197         }
198         else
199         {
200                 /* Plain value */
201                 if (VARATT_IS_EXTERNAL(attr))
202                 {
203                         /* fast path */
204                         return (toast_fetch_datum_slice(attr, sliceoffset, slicelength));
205                 }
206                 else
207                         preslice = attr;
208         }
209
210         /* slicing of datum for compressed cases and plain value */
211
212         attrsize = VARSIZE(preslice) - VARHDRSZ;
213         if (sliceoffset >= attrsize)
214         {
215                 sliceoffset = 0;
216                 slicelength = 0;
217         }
218
219         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
220                 slicelength = attrsize - sliceoffset;
221
222         result = (varattrib *) palloc(slicelength + VARHDRSZ);
223         VARATT_SIZEP(result) = slicelength + VARHDRSZ;
224
225         memcpy(VARDATA(result), VARDATA(preslice) + sliceoffset, slicelength);
226
227         if (preslice != attr)
228                 pfree(preslice);
229
230         return result;
231 }
232
233
234 /* ----------
235  * toast_raw_datum_size -
236  *
237  *      Return the raw (detoasted) size of a varlena datum
238  * ----------
239  */
240 Size
241 toast_raw_datum_size(Datum value)
242 {
243         varattrib  *attr = (varattrib *) DatumGetPointer(value);
244         Size            result;
245
246         if (VARATT_IS_COMPRESSED(attr))
247         {
248                 /*
249                  * va_rawsize shows the original data size, whether the datum is
250                  * external or not.
251                  */
252                 result = attr->va_content.va_compressed.va_rawsize + VARHDRSZ;
253         }
254         else if (VARATT_IS_EXTERNAL(attr))
255         {
256                 /*
257                  * an uncompressed external attribute has rawsize including the
258                  * header (not too consistent!)
259                  */
260                 result = attr->va_content.va_external.va_rawsize;
261         }
262         else
263         {
264                 /* plain untoasted datum */
265                 result = VARSIZE(attr);
266         }
267         return result;
268 }
269
270
271 /* ----------
272  * toast_delete -
273  *
274  *      Cascaded delete toast-entries on DELETE
275  * ----------
276  */
277 static void
278 toast_delete(Relation rel, HeapTuple oldtup)
279 {
280         TupleDesc       tupleDesc;
281         Form_pg_attribute *att;
282         int                     numAttrs;
283         int                     i;
284         Datum           value;
285         bool            isnull;
286
287         /*
288          * Get the tuple descriptor, the number of and attribute descriptors.
289          */
290         tupleDesc = rel->rd_att;
291         numAttrs = tupleDesc->natts;
292         att = tupleDesc->attrs;
293
294         /*
295          * Check for external stored attributes and delete them from the
296          * secondary relation.
297          */
298         for (i = 0; i < numAttrs; i++)
299         {
300                 if (att[i]->attlen == -1)
301                 {
302                         value = heap_getattr(oldtup, i + 1, tupleDesc, &isnull);
303                         if (!isnull && VARATT_IS_EXTERNAL(value))
304                                 toast_delete_datum(rel, value);
305                 }
306         }
307 }
308
309
310 /* ----------
311  * toast_insert_or_update -
312  *
313  *      Delete no-longer-used toast-entries and create new ones to
314  *      make the new tuple fit on INSERT or UPDATE
315  * ----------
316  */
317 static void
318 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup)
319 {
320         TupleDesc       tupleDesc;
321         Form_pg_attribute *att;
322         int                     numAttrs;
323         int                     i;
324         bool            old_isnull;
325         bool            new_isnull;
326
327         bool            need_change = false;
328         bool            need_free = false;
329         bool            need_delold = false;
330         bool            has_nulls = false;
331
332         Size            maxDataLen;
333
334         char            toast_action[MaxHeapAttributeNumber];
335         char            toast_nulls[MaxHeapAttributeNumber];
336         Datum           toast_values[MaxHeapAttributeNumber];
337         int32           toast_sizes[MaxHeapAttributeNumber];
338         bool            toast_free[MaxHeapAttributeNumber];
339         bool            toast_delold[MaxHeapAttributeNumber];
340
341         /*
342          * Get the tuple descriptor, the number of and attribute descriptors
343          * and the location of the tuple values.
344          */
345         tupleDesc = rel->rd_att;
346         numAttrs = tupleDesc->natts;
347         att = tupleDesc->attrs;
348
349         /* ----------
350          * Then collect information about the values given
351          *
352          * NOTE: toast_action[i] can have these values:
353          *              ' '             default handling
354          *              'p'             already processed --- don't touch it
355          *              'x'             incompressible, but OK to move off
356          * ----------
357          */
358         memset(toast_action, ' ', numAttrs * sizeof(char));
359         memset(toast_nulls, ' ', numAttrs * sizeof(char));
360         memset(toast_free, 0, numAttrs * sizeof(bool));
361         memset(toast_delold, 0, numAttrs * sizeof(bool));
362         for (i = 0; i < numAttrs; i++)
363         {
364                 varattrib  *old_value;
365                 varattrib  *new_value;
366
367                 if (oldtup != NULL)
368                 {
369                         /*
370                          * For UPDATE get the old and new values of this attribute
371                          */
372                         old_value = (varattrib *) DatumGetPointer(
373                                         heap_getattr(oldtup, i + 1, tupleDesc, &old_isnull));
374                         toast_values[i] =
375                                 heap_getattr(newtup, i + 1, tupleDesc, &new_isnull);
376                         new_value = (varattrib *) DatumGetPointer(toast_values[i]);
377
378                         /*
379                          * If the old value is an external stored one, check if it has
380                          * changed so we have to delete it later.
381                          */
382                         if (!old_isnull && att[i]->attlen == -1 &&
383                                 VARATT_IS_EXTERNAL(old_value))
384                         {
385                                 if (new_isnull || !VARATT_IS_EXTERNAL(new_value) ||
386                                         old_value->va_content.va_external.va_valueid !=
387                                         new_value->va_content.va_external.va_valueid ||
388                                         old_value->va_content.va_external.va_toastrelid !=
389                                         new_value->va_content.va_external.va_toastrelid)
390                                 {
391                                         /*
392                                          * The old external store value isn't needed any more
393                                          * after the update
394                                          */
395                                         toast_delold[i] = true;
396                                         need_delold = true;
397                                 }
398                                 else
399                                 {
400                                         /*
401                                          * This attribute isn't changed by this update so we
402                                          * reuse the original reference to the old value in
403                                          * the new tuple.
404                                          */
405                                         toast_action[i] = 'p';
406                                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
407                                         continue;
408                                 }
409                         }
410                 }
411                 else
412                 {
413                         /*
414                          * For INSERT simply get the new value
415                          */
416                         toast_values[i] =
417                                 heap_getattr(newtup, i + 1, tupleDesc, &new_isnull);
418                 }
419
420                 /*
421                  * Handle NULL attributes
422                  */
423                 if (new_isnull)
424                 {
425                         toast_action[i] = 'p';
426                         toast_nulls[i] = 'n';
427                         has_nulls = true;
428                         continue;
429                 }
430
431                 /*
432                  * Now look at varsize attributes
433                  */
434                 if (att[i]->attlen == -1)
435                 {
436                         /*
437                          * If the table's attribute says PLAIN always, force it so.
438                          */
439                         if (att[i]->attstorage == 'p')
440                                 toast_action[i] = 'p';
441
442                         /*
443                          * We took care of UPDATE above, so any external value we find
444                          * still in the tuple must be someone else's we cannot reuse.
445                          * Expand it to plain (and, probably, toast it again below).
446                          */
447                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
448                         {
449                                 toast_values[i] = PointerGetDatum(heap_tuple_untoast_attr(
450                                                 (varattrib *) DatumGetPointer(toast_values[i])));
451                                 toast_free[i] = true;
452                                 need_change = true;
453                                 need_free = true;
454                         }
455
456                         /*
457                          * Remember the size of this attribute
458                          */
459                         toast_sizes[i] = VARATT_SIZE(DatumGetPointer(toast_values[i]));
460                 }
461                 else
462                 {
463                         /*
464                          * Not a variable size attribute, plain storage always
465                          */
466                         toast_action[i] = 'p';
467                         toast_sizes[i] = att[i]->attlen;
468                 }
469         }
470
471         /* ----------
472          * Compress and/or save external until data fits into target length
473          *
474          *      1: Inline compress attributes with attstorage 'x'
475          *      2: Store attributes with attstorage 'x' or 'e' external
476          *      3: Inline compress attributes with attstorage 'm'
477          *      4: Store attributes with attstorage 'm' external
478          * ----------
479          */
480         maxDataLen = offsetof(HeapTupleHeaderData, t_bits);
481         if (has_nulls)
482                 maxDataLen += BITMAPLEN(numAttrs);
483         maxDataLen = TOAST_TUPLE_TARGET - MAXALIGN(maxDataLen);
484
485         /*
486          * Look for attributes with attstorage 'x' to compress
487          */
488         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
489                    maxDataLen)
490         {
491                 int                     biggest_attno = -1;
492                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
493                 Datum           old_value;
494                 Datum           new_value;
495
496                 /*
497                  * Search for the biggest yet uncompressed internal attribute
498                  */
499                 for (i = 0; i < numAttrs; i++)
500                 {
501                         if (toast_action[i] != ' ')
502                                 continue;
503                         if (VARATT_IS_EXTENDED(toast_values[i]))
504                                 continue;
505                         if (att[i]->attstorage != 'x')
506                                 continue;
507                         if (toast_sizes[i] > biggest_size)
508                         {
509                                 biggest_attno = i;
510                                 biggest_size = toast_sizes[i];
511                         }
512                 }
513
514                 if (biggest_attno < 0)
515                         break;
516
517                 /*
518                  * Attempt to compress it inline
519                  */
520                 i = biggest_attno;
521                 old_value = toast_values[i];
522                 new_value = toast_compress_datum(old_value);
523
524                 if (DatumGetPointer(new_value) != NULL)
525                 {
526                         /* successful compression */
527                         if (toast_free[i])
528                                 pfree(DatumGetPointer(old_value));
529                         toast_values[i] = new_value;
530                         toast_free[i] = true;
531                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
532                         need_change = true;
533                         need_free = true;
534                 }
535                 else
536                 {
537                         /*
538                          * incompressible data, ignore on subsequent compression
539                          * passes
540                          */
541                         toast_action[i] = 'x';
542                 }
543         }
544
545         /*
546          * Second we look for attributes of attstorage 'x' or 'e' that are
547          * still inline.
548          */
549         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
550                    maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
551         {
552                 int                     biggest_attno = -1;
553                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
554                 Datum           old_value;
555
556                 /*------
557                  * Search for the biggest yet inlined attribute with
558                  * attstorage equals 'x' or 'e'
559                  *------
560                  */
561                 for (i = 0; i < numAttrs; i++)
562                 {
563                         if (toast_action[i] == 'p')
564                                 continue;
565                         if (VARATT_IS_EXTERNAL(toast_values[i]))
566                                 continue;
567                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
568                                 continue;
569                         if (toast_sizes[i] > biggest_size)
570                         {
571                                 biggest_attno = i;
572                                 biggest_size = toast_sizes[i];
573                         }
574                 }
575
576                 if (biggest_attno < 0)
577                         break;
578
579                 /*
580                  * Store this external
581                  */
582                 i = biggest_attno;
583                 old_value = toast_values[i];
584                 toast_action[i] = 'p';
585                 toast_values[i] = toast_save_datum(rel, toast_values[i]);
586                 if (toast_free[i])
587                         pfree(DatumGetPointer(old_value));
588
589                 toast_free[i] = true;
590                 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
591
592                 need_change = true;
593                 need_free = true;
594         }
595
596         /*
597          * Round 3 - this time we take attributes with storage 'm' into
598          * compression
599          */
600         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
601                    maxDataLen)
602         {
603                 int                     biggest_attno = -1;
604                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
605                 Datum           old_value;
606                 Datum           new_value;
607
608                 /*
609                  * Search for the biggest yet uncompressed internal attribute
610                  */
611                 for (i = 0; i < numAttrs; i++)
612                 {
613                         if (toast_action[i] != ' ')
614                                 continue;
615                         if (VARATT_IS_EXTENDED(toast_values[i]))
616                                 continue;
617                         if (att[i]->attstorage != 'm')
618                                 continue;
619                         if (toast_sizes[i] > biggest_size)
620                         {
621                                 biggest_attno = i;
622                                 biggest_size = toast_sizes[i];
623                         }
624                 }
625
626                 if (biggest_attno < 0)
627                         break;
628
629                 /*
630                  * Attempt to compress it inline
631                  */
632                 i = biggest_attno;
633                 old_value = toast_values[i];
634                 new_value = toast_compress_datum(old_value);
635
636                 if (DatumGetPointer(new_value) != NULL)
637                 {
638                         /* successful compression */
639                         if (toast_free[i])
640                                 pfree(DatumGetPointer(old_value));
641                         toast_values[i] = new_value;
642                         toast_free[i] = true;
643                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
644                         need_change = true;
645                         need_free = true;
646                 }
647                 else
648                 {
649                         /*
650                          * incompressible data, ignore on subsequent compression
651                          * passes
652                          */
653                         toast_action[i] = 'x';
654                 }
655         }
656
657         /*
658          * Finally we store attributes of type 'm' external
659          */
660         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
661                    maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
662         {
663                 int                     biggest_attno = -1;
664                 int32           biggest_size = MAXALIGN(sizeof(varattrib));
665                 Datum           old_value;
666
667                 /*--------
668                  * Search for the biggest yet inlined attribute with
669                  * attstorage = 'm'
670                  *--------
671                  */
672                 for (i = 0; i < numAttrs; i++)
673                 {
674                         if (toast_action[i] == 'p')
675                                 continue;
676                         if (VARATT_IS_EXTERNAL(toast_values[i]))
677                                 continue;
678                         if (att[i]->attstorage != 'm')
679                                 continue;
680                         if (toast_sizes[i] > biggest_size)
681                         {
682                                 biggest_attno = i;
683                                 biggest_size = toast_sizes[i];
684                         }
685                 }
686
687                 if (biggest_attno < 0)
688                         break;
689
690                 /*
691                  * Store this external
692                  */
693                 i = biggest_attno;
694                 old_value = toast_values[i];
695                 toast_action[i] = 'p';
696                 toast_values[i] = toast_save_datum(rel, toast_values[i]);
697                 if (toast_free[i])
698                         pfree(DatumGetPointer(old_value));
699
700                 toast_free[i] = true;
701                 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
702
703                 need_change = true;
704                 need_free = true;
705         }
706
707         /*
708          * In the case we toasted any values, we need to build a new heap
709          * tuple with the changed values.
710          */
711         if (need_change)
712         {
713                 HeapTupleHeader olddata = newtup->t_data;
714                 char       *new_data;
715                 int32           new_len;
716
717                 /*
718                  * Calculate the new size of the tuple.  Header size should not
719                  * change, but data size might.
720                  */
721                 new_len = offsetof(HeapTupleHeaderData, t_bits);
722                 if (has_nulls)
723                         new_len += BITMAPLEN(numAttrs);
724                 if (olddata->t_infomask & HEAP_HASOID)
725                         new_len += sizeof(Oid);
726                 new_len = MAXALIGN(new_len);
727                 Assert(new_len == olddata->t_hoff);
728                 new_len += ComputeDataSize(tupleDesc, toast_values, toast_nulls);
729
730                 /*
731                  * Allocate new tuple in same context as old one.
732                  */
733                 new_data = (char *) MemoryContextAlloc(newtup->t_datamcxt, new_len);
734                 newtup->t_data = (HeapTupleHeader) new_data;
735                 newtup->t_len = new_len;
736
737                 /*
738                  * Put the tuple header and the changed values into place
739                  */
740                 memcpy(new_data, olddata, olddata->t_hoff);
741
742                 DataFill((char *) new_data + olddata->t_hoff,
743                                  tupleDesc,
744                                  toast_values,
745                                  toast_nulls,
746                                  &(newtup->t_data->t_infomask),
747                                  has_nulls ? newtup->t_data->t_bits : NULL);
748
749                 /*
750                  * In the case we modified a previously modified tuple again, free
751                  * the memory from the previous run
752                  */
753                 if ((char *) olddata != ((char *) newtup + HEAPTUPLESIZE))
754                         pfree(olddata);
755         }
756
757         /*
758          * Free allocated temp values
759          */
760         if (need_free)
761                 for (i = 0; i < numAttrs; i++)
762                         if (toast_free[i])
763                                 pfree(DatumGetPointer(toast_values[i]));
764
765         /*
766          * Delete external values from the old tuple
767          */
768         if (need_delold)
769                 for (i = 0; i < numAttrs; i++)
770                         if (toast_delold[i])
771                                 toast_delete_datum(rel,
772                                         heap_getattr(oldtup, i + 1, tupleDesc, &old_isnull));
773 }
774
775
776 /* ----------
777  * toast_compress_datum -
778  *
779  *      Create a compressed version of a varlena datum
780  *
781  *      If we fail (ie, compressed result is actually bigger than original)
782  *      then return NULL.  We must not use compressed data if it'd expand
783  *      the tuple!
784  * ----------
785  */
786 Datum
787 toast_compress_datum(Datum value)
788 {
789         varattrib  *tmp;
790
791         tmp = (varattrib *) palloc(sizeof(PGLZ_Header) + VARATT_SIZE(value));
792         pglz_compress(VARATT_DATA(value), VARATT_SIZE(value) - VARHDRSZ,
793                                   (PGLZ_Header *) tmp,
794                                   PGLZ_strategy_default);
795         if (VARATT_SIZE(tmp) < VARATT_SIZE(value))
796         {
797                 /* successful compression */
798                 VARATT_SIZEP(tmp) |= VARATT_FLAG_COMPRESSED;
799                 return PointerGetDatum(tmp);
800         }
801         else
802         {
803                 /* incompressible data */
804                 pfree(tmp);
805                 return PointerGetDatum(NULL);
806         }
807 }
808
809
810 /* ----------
811  * toast_save_datum -
812  *
813  *      Save one single datum into the secondary relation and return
814  *      a varattrib reference for it.
815  * ----------
816  */
817 static Datum
818 toast_save_datum(Relation rel, Datum value)
819 {
820         Relation        toastrel;
821         Relation        toastidx;
822         HeapTuple       toasttup;
823         InsertIndexResult idxres;
824         TupleDesc       toasttupDesc;
825         Datum           t_values[3];
826         char            t_nulls[3];
827         varattrib  *result;
828         struct
829         {
830                 struct varlena hdr;
831                 char            data[TOAST_MAX_CHUNK_SIZE];
832         }                       chunk_data;
833         int32           chunk_size;
834         int32           chunk_seq = 0;
835         char       *data_p;
836         int32           data_todo;
837
838         /*
839          * Create the varattrib reference
840          */
841         result = (varattrib *) palloc(sizeof(varattrib));
842
843         result->va_header = sizeof(varattrib) | VARATT_FLAG_EXTERNAL;
844         if (VARATT_IS_COMPRESSED(value))
845         {
846                 result->va_header |= VARATT_FLAG_COMPRESSED;
847                 result->va_content.va_external.va_rawsize =
848                         ((varattrib *) value)->va_content.va_compressed.va_rawsize;
849         }
850         else
851                 result->va_content.va_external.va_rawsize = VARATT_SIZE(value);
852
853         result->va_content.va_external.va_extsize =
854                 VARATT_SIZE(value) - VARHDRSZ;
855         result->va_content.va_external.va_valueid = newoid();
856         result->va_content.va_external.va_toastrelid =
857                 rel->rd_rel->reltoastrelid;
858
859         /*
860          * Initialize constant parts of the tuple data
861          */
862         t_values[0] = ObjectIdGetDatum(result->va_content.va_external.va_valueid);
863         t_values[2] = PointerGetDatum(&chunk_data);
864         t_nulls[0] = ' ';
865         t_nulls[1] = ' ';
866         t_nulls[2] = ' ';
867
868         /*
869          * Get the data to process
870          */
871         data_p = VARATT_DATA(value);
872         data_todo = VARATT_SIZE(value) - VARHDRSZ;
873
874         /*
875          * Open the toast relation
876          */
877         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
878         toasttupDesc = toastrel->rd_att;
879         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
880
881         /*
882          * Split up the item into chunks
883          */
884         while (data_todo > 0)
885         {
886                 /*
887                  * Calculate the size of this chunk
888                  */
889                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
890
891                 /*
892                  * Build a tuple and store it
893                  */
894                 t_values[1] = Int32GetDatum(chunk_seq++);
895                 VARATT_SIZEP(&chunk_data) = chunk_size + VARHDRSZ;
896                 memcpy(VARATT_DATA(&chunk_data), data_p, chunk_size);
897                 toasttup = heap_formtuple(toasttupDesc, t_values, t_nulls);
898                 if (!HeapTupleIsValid(toasttup))
899                         elog(ERROR, "failed to build TOAST tuple");
900
901                 simple_heap_insert(toastrel, toasttup);
902
903                 /*
904                  * Create the index entry.      We cheat a little here by not using
905                  * FormIndexDatum: this relies on the knowledge that the index
906                  * columns are the same as the initial columns of the table.
907                  *
908                  * Note also that there had better not be any user-created index on
909                  * the TOAST table, since we don't bother to update anything else.
910                  */
911                 idxres = index_insert(toastidx, t_values, t_nulls,
912                                                           &(toasttup->t_self),
913                                                           toastrel, toastidx->rd_index->indisunique);
914                 if (idxres == NULL)
915                         elog(ERROR, "failed to insert index entry for TOAST tuple");
916
917                 /*
918                  * Free memory
919                  */
920                 pfree(idxres);
921                 heap_freetuple(toasttup);
922
923                 /*
924                  * Move on to next chunk
925                  */
926                 data_todo -= chunk_size;
927                 data_p += chunk_size;
928         }
929
930         /*
931          * Done - close toast relation and return the reference
932          */
933         index_close(toastidx);
934         heap_close(toastrel, RowExclusiveLock);
935
936         return PointerGetDatum(result);
937 }
938
939
940 /* ----------
941  * toast_delete_datum -
942  *
943  *      Delete a single external stored value.
944  * ----------
945  */
946 static void
947 toast_delete_datum(Relation rel, Datum value)
948 {
949         varattrib  *attr = (varattrib *) DatumGetPointer(value);
950         Relation        toastrel;
951         Relation        toastidx;
952         ScanKeyData toastkey;
953         IndexScanDesc toastscan;
954         HeapTuple       toasttup;
955
956         if (!VARATT_IS_EXTERNAL(attr))
957                 return;
958
959         /*
960          * Open the toast relation and it's index
961          */
962         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
963                                                  RowExclusiveLock);
964         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
965
966         /*
967          * Setup a scan key to fetch from the index by va_valueid (we don't
968          * particularly care whether we see them in sequence or not)
969          */
970         ScanKeyEntryInitialize(&toastkey,
971                                                    (bits16) 0,
972                                                    (AttrNumber) 1,
973                                                    (RegProcedure) F_OIDEQ,
974                           ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
975
976         /*
977          * Find the chunks by index
978          */
979         toastscan = index_beginscan(toastrel, toastidx, SnapshotToast,
980                                                                 1, &toastkey);
981         while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
982         {
983                 /*
984                  * Have a chunk, delete it
985                  */
986                 simple_heap_delete(toastrel, &toasttup->t_self);
987         }
988
989         /*
990          * End scan and close relations
991          */
992         index_endscan(toastscan);
993         index_close(toastidx);
994         heap_close(toastrel, RowExclusiveLock);
995 }
996
997
998 /* ----------
999  * toast_fetch_datum -
1000  *
1001  *      Reconstruct an in memory varattrib from the chunks saved
1002  *      in the toast relation
1003  * ----------
1004  */
1005 static varattrib *
1006 toast_fetch_datum(varattrib *attr)
1007 {
1008         Relation        toastrel;
1009         Relation        toastidx;
1010         ScanKeyData toastkey;
1011         IndexScanDesc toastscan;
1012         HeapTuple       ttup;
1013         TupleDesc       toasttupDesc;
1014         varattrib  *result;
1015         int32           ressize;
1016         int32           residx,
1017                                 nextidx;
1018         int32           numchunks;
1019         Pointer         chunk;
1020         bool            isnull;
1021         int32           chunksize;
1022
1023         ressize = attr->va_content.va_external.va_extsize;
1024         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1025
1026         result = (varattrib *) palloc(ressize + VARHDRSZ);
1027         VARATT_SIZEP(result) = ressize + VARHDRSZ;
1028         if (VARATT_IS_COMPRESSED(attr))
1029                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1030
1031         /*
1032          * Open the toast relation and its index
1033          */
1034         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1035                                                  AccessShareLock);
1036         toasttupDesc = toastrel->rd_att;
1037         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1038
1039         /*
1040          * Setup a scan key to fetch from the index by va_valueid
1041          */
1042         ScanKeyEntryInitialize(&toastkey,
1043                                                    (bits16) 0,
1044                                                    (AttrNumber) 1,
1045                                                    (RegProcedure) F_OIDEQ,
1046                           ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1047
1048         /*
1049          * Read the chunks by index
1050          *
1051          * Note that because the index is actually on (valueid, chunkidx) we will
1052          * see the chunks in chunkidx order, even though we didn't explicitly
1053          * ask for it.
1054          */
1055         nextidx = 0;
1056
1057         toastscan = index_beginscan(toastrel, toastidx, SnapshotToast,
1058                                                                 1, &toastkey);
1059         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1060         {
1061                 /*
1062                  * Have a chunk, extract the sequence number and the data
1063                  */
1064                 residx = DatumGetInt32(heap_getattr(ttup, 2, toasttupDesc, &isnull));
1065                 Assert(!isnull);
1066                 chunk = DatumGetPointer(heap_getattr(ttup, 3, toasttupDesc, &isnull));
1067                 Assert(!isnull);
1068                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1069
1070                 /*
1071                  * Some checks on the data we've found
1072                  */
1073                 if (residx != nextidx)
1074                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1075                                  residx, nextidx,
1076                                  attr->va_content.va_external.va_valueid);
1077                 if (residx < numchunks - 1)
1078                 {
1079                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1080                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1081                                          chunksize, residx,
1082                                          attr->va_content.va_external.va_valueid);
1083                 }
1084                 else if (residx < numchunks)
1085                 {
1086                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1087                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1088                                          chunksize, residx,
1089                                          attr->va_content.va_external.va_valueid);
1090                 }
1091                 else
1092                         elog(ERROR, "unexpected chunk number %d for toast value %u",
1093                                  residx,
1094                                  attr->va_content.va_external.va_valueid);
1095
1096                 /*
1097                  * Copy the data into proper place in our result
1098                  */
1099                 memcpy(((char *) VARATT_DATA(result)) + residx * TOAST_MAX_CHUNK_SIZE,
1100                            VARATT_DATA(chunk),
1101                            chunksize);
1102
1103                 nextidx++;
1104         }
1105
1106         /*
1107          * Final checks that we successfully fetched the datum
1108          */
1109         if (nextidx != numchunks)
1110                 elog(ERROR, "missing chunk number %d for toast value %u",
1111                          nextidx,
1112                          attr->va_content.va_external.va_valueid);
1113
1114         /*
1115          * End scan and close relations
1116          */
1117         index_endscan(toastscan);
1118         index_close(toastidx);
1119         heap_close(toastrel, AccessShareLock);
1120
1121         return result;
1122 }
1123
1124 /* ----------
1125  * toast_fetch_datum_slice -
1126  *
1127  *      Reconstruct a segment of a varattrib from the chunks saved
1128  *      in the toast relation
1129  * ----------
1130  */
1131 static varattrib *
1132 toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
1133 {
1134         Relation        toastrel;
1135         Relation        toastidx;
1136         ScanKeyData toastkey[3];
1137         int                     nscankeys;
1138         IndexScanDesc toastscan;
1139         HeapTuple       ttup;
1140         TupleDesc       toasttupDesc;
1141         varattrib  *result;
1142         int32           attrsize;
1143         int32           residx;
1144         int32           nextidx;
1145         int                     numchunks;
1146         int                     startchunk;
1147         int                     endchunk;
1148         int32           startoffset;
1149         int32           endoffset;
1150         int                     totalchunks;
1151         Pointer         chunk;
1152         bool            isnull;
1153         int32           chunksize;
1154         int32           chcpystrt;
1155         int32           chcpyend;
1156
1157         attrsize = attr->va_content.va_external.va_extsize;
1158         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1159
1160         if (sliceoffset >= attrsize)
1161         {
1162                 sliceoffset = 0;
1163                 length = 0;
1164         }
1165
1166         if (((sliceoffset + length) > attrsize) || length < 0)
1167                 length = attrsize - sliceoffset;
1168
1169         result = (varattrib *) palloc(length + VARHDRSZ);
1170         VARATT_SIZEP(result) = length + VARHDRSZ;
1171
1172         if (VARATT_IS_COMPRESSED(attr))
1173                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1174
1175         if (length == 0)
1176                 return (result);                /* Can save a lot of work at this point! */
1177
1178         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1179         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1180         numchunks = (endchunk - startchunk) + 1;
1181
1182         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1183         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1184
1185         /*
1186          * Open the toast relation and it's index
1187          */
1188         toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1189                                                  AccessShareLock);
1190         toasttupDesc = toastrel->rd_att;
1191         toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1192
1193         /*
1194          * Setup a scan key to fetch from the index. This is either two keys
1195          * or three depending on the number of chunks.
1196          */
1197         ScanKeyEntryInitialize(&toastkey[0],
1198                                                    (bits16) 0,
1199                                                    (AttrNumber) 1,
1200                                                    (RegProcedure) F_OIDEQ,
1201                           ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1202
1203         /*
1204          * Now dependent on number of chunks:
1205          */
1206
1207         if (numchunks == 1)
1208         {
1209                 ScanKeyEntryInitialize(&toastkey[1],
1210                                                            (bits16) 0,
1211                                                            (AttrNumber) 2,
1212                                                            (RegProcedure) F_INT4EQ,
1213                                                            Int32GetDatum(startchunk));
1214                 nscankeys = 2;
1215         }
1216         else
1217         {
1218                 ScanKeyEntryInitialize(&toastkey[1],
1219                                                            (bits16) 0,
1220                                                            (AttrNumber) 2,
1221                                                            (RegProcedure) F_INT4GE,
1222                                                            Int32GetDatum(startchunk));
1223                 ScanKeyEntryInitialize(&toastkey[2],
1224                                                            (bits16) 0,
1225                                                            (AttrNumber) 2,
1226                                                            (RegProcedure) F_INT4LE,
1227                                                            Int32GetDatum(endchunk));
1228                 nscankeys = 3;
1229         }
1230
1231         /*
1232          * Read the chunks by index
1233          *
1234          * The index is on (valueid, chunkidx) so they will come in order
1235          */
1236         nextidx = startchunk;
1237         toastscan = index_beginscan(toastrel, toastidx, SnapshotToast,
1238                                                                 nscankeys, toastkey);
1239         while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1240         {
1241                 /*
1242                  * Have a chunk, extract the sequence number and the data
1243                  */
1244                 residx = DatumGetInt32(heap_getattr(ttup, 2, toasttupDesc, &isnull));
1245                 Assert(!isnull);
1246                 chunk = DatumGetPointer(heap_getattr(ttup, 3, toasttupDesc, &isnull));
1247                 Assert(!isnull);
1248                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1249
1250                 /*
1251                  * Some checks on the data we've found
1252                  */
1253                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1254                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1255                                  residx, nextidx,
1256                                  attr->va_content.va_external.va_valueid);
1257                 if (residx < totalchunks - 1)
1258                 {
1259                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1260                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1261                                          chunksize, residx,
1262                                          attr->va_content.va_external.va_valueid);
1263                 }
1264                 else
1265                 {
1266                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1267                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1268                                          chunksize, residx,
1269                                          attr->va_content.va_external.va_valueid);
1270                 }
1271
1272                 /*
1273                  * Copy the data into proper place in our result
1274                  */
1275                 chcpystrt = 0;
1276                 chcpyend = chunksize - 1;
1277                 if (residx == startchunk)
1278                         chcpystrt = startoffset;
1279                 if (residx == endchunk)
1280                         chcpyend = endoffset;
1281
1282                 memcpy(((char *) VARATT_DATA(result)) +
1283                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1284                            VARATT_DATA(chunk) + chcpystrt,
1285                            (chcpyend - chcpystrt) + 1);
1286
1287                 nextidx++;
1288         }
1289
1290         /*
1291          * Final checks that we successfully fetched the datum
1292          */
1293         if (nextidx != (endchunk + 1))
1294                 elog(ERROR, "missing chunk number %d for toast value %u",
1295                          nextidx,
1296                          attr->va_content.va_external.va_valueid);
1297
1298         /*
1299          * End scan and close relations
1300          */
1301         index_endscan(toastscan);
1302         index_close(toastidx);
1303         heap_close(toastrel, AccessShareLock);
1304
1305         return result;
1306 }