]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
If a field is incompressible ('compressed' data is actually larger than
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/access/heap/tuptoaster.c,v 1.13 2000/10/23 23:42:04 tgl Exp $
12  *
13  *
14  * INTERFACE ROUTINES
15  *              heap_tuple_toast_attrs -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              heap_tuple_untoast_attr -
20  *                      Fetch back a given value from the "secondary" relation
21  *
22  *-------------------------------------------------------------------------
23  */
24
25 #include "postgres.h"
26
27 #include <unistd.h>
28 #include <fcntl.h>
29
30 #include "access/heapam.h"
31 #include "access/genam.h"
32 #include "access/tuptoaster.h"
33 #include "catalog/catalog.h"
34 #include "utils/rel.h"
35 #include "utils/builtins.h"
36 #include "utils/fmgroids.h"
37 #include "utils/pg_lzcompress.h"
38
39
40 #ifdef TUPLE_TOASTER_ACTIVE
41
42 #undef TOAST_DEBUG
43
44 static void                     toast_delete(Relation rel, HeapTuple oldtup);
45 static void                     toast_delete_datum(Relation rel, Datum value);
46 static void                     toast_insert_or_update(Relation rel, HeapTuple newtup,
47                                                                 HeapTuple oldtup);
48 static Datum            toast_compress_datum(Datum value);
49 static Datum            toast_save_datum(Relation rel, Oid mainoid, int16 attno, Datum value);
50 static varattrib   *toast_fetch_datum(varattrib *attr);
51
52
53 /* ----------
54  * heap_tuple_toast_attrs -
55  *
56  *      This is the central public entry point for toasting from heapam.
57  *
58  *      Calls the appropriate event specific action.
59  * ----------
60  */
61 void
62 heap_tuple_toast_attrs(Relation rel, HeapTuple newtup, HeapTuple oldtup)
63 {
64         if (newtup == NULL)
65                 toast_delete(rel, oldtup);
66         else
67                 toast_insert_or_update(rel, newtup, oldtup);
68 }
69
70
71 /* ----------
72  * heap_tuple_fetch_attr -
73  *
74  *      Public entry point to get back a toasted value 
75  *      external storage (possibly still in compressed format).
76  * ----------
77  */
78 varattrib *
79 heap_tuple_fetch_attr(varattrib *attr)
80 {
81         varattrib       *result;
82
83         if (VARATT_IS_EXTERNAL(attr))
84         {
85                 /* ----------
86                  * This is an external stored plain value
87                  * ----------
88                  */
89                 result = toast_fetch_datum(attr);
90         }
91         else
92         {
93                 /* ----------
94                  * This is a plain value inside of the main tuple - why am I called?
95                  * ----------
96                  */
97                 result = attr;
98     }
99
100         return result;
101 }
102
103
104 /* ----------
105  * heap_tuple_untoast_attr -
106  *
107  *      Public entry point to get back a toasted value from compression
108  *      or external storage.
109  * ----------
110  */
111 varattrib *
112 heap_tuple_untoast_attr(varattrib *attr)
113 {
114         varattrib       *result;
115
116         if (VARATT_IS_EXTERNAL(attr))
117         {
118                 if (VARATT_IS_COMPRESSED(attr))
119                 {
120                         /* ----------
121                          * This is an external stored compressed value
122                          * Fetch it from the toast heap and decompress.
123                          * ----------
124                          */
125                         varattrib *tmp;
126
127                         tmp = toast_fetch_datum(attr);
128                         result = (varattrib *)palloc(attr->va_content.va_external.va_rawsize
129                                                                 + VARHDRSZ);
130                         VARATT_SIZEP(result) = attr->va_content.va_external.va_rawsize
131                                                                 + VARHDRSZ;
132                         pglz_decompress((PGLZ_Header *)tmp, VARATT_DATA(result));
133
134                         pfree(tmp);
135                 }
136                 else
137                 {
138                         /* ----------
139                          * This is an external stored plain value
140                          * ----------
141                          */
142                         result = toast_fetch_datum(attr);
143                 }
144         }
145         else if (VARATT_IS_COMPRESSED(attr))
146         {
147                 /* ----------
148                  * This is a compressed value inside of the main tuple
149                  * ----------
150                  */
151                 result = (varattrib *)palloc(attr->va_content.va_compressed.va_rawsize
152                                                         + VARHDRSZ);
153                 VARATT_SIZEP(result) = attr->va_content.va_compressed.va_rawsize
154                                                         + VARHDRSZ;
155                 pglz_decompress((PGLZ_Header *)attr, VARATT_DATA(result));
156         }
157         else
158                 /* ----------
159                  * This is a plain value inside of the main tuple - why am I called?
160                  * ----------
161                  */
162                 return attr;
163
164         return result;
165 }
166
167
168 /* ----------
169  * toast_delete -
170  *
171  *      Cascaded delete toast-entries on DELETE
172  * ----------
173  */
174 static void
175 toast_delete(Relation rel, HeapTuple oldtup)
176 {
177         TupleDesc                       tupleDesc;
178         Form_pg_attribute  *att;
179         int                                     numAttrs;
180         int                                     i;
181         Datum                           value;
182         bool                            isnull;
183
184         /* ----------
185          * Get the tuple descriptor, the number of and attribute
186          * descriptors.
187          * ----------
188          */
189         tupleDesc       = rel->rd_att;
190         numAttrs        = tupleDesc->natts;
191         att                     = tupleDesc->attrs;
192
193         /* ----------
194          * Check for external stored attributes and delete them
195          * from the secondary relation.
196          * ----------
197          */
198         for (i = 0; i < numAttrs; i++)
199         {
200                 value = heap_getattr(oldtup, i + 1, tupleDesc, &isnull);
201                 if (!isnull && att[i]->attlen == -1)
202                         if (VARATT_IS_EXTERNAL(value))
203                                 toast_delete_datum(rel, value);
204         }
205 }
206
207
208 /* ----------
209  * toast_insert_or_update -
210  *
211  *      Delete no-longer-used toast-entries and create new ones to
212  *      make the new tuple fit on INSERT or UPDATE
213  * ----------
214  */
215 static void
216 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup)
217 {
218         TupleDesc                       tupleDesc;
219         Form_pg_attribute  *att;
220         int                                     numAttrs;
221         int                                     i;
222         bool                            old_isnull;
223         bool                            new_isnull;
224
225         bool                            need_change = false;
226         bool                            need_free   = false;
227         bool                            need_delold = false;
228         bool                            has_nulls   = false;
229
230         Size                            maxDataLen;
231
232         char                            toast_action[MaxHeapAttributeNumber];
233         char                            toast_nulls[MaxHeapAttributeNumber];
234         Datum                           toast_values[MaxHeapAttributeNumber];
235         int32                           toast_sizes[MaxHeapAttributeNumber];
236         bool                            toast_free[MaxHeapAttributeNumber];
237         bool                            toast_delold[MaxHeapAttributeNumber];
238
239         /* ----------
240          * Get the tuple descriptor, the number of and attribute
241          * descriptors and the location of the tuple values.
242          * ----------
243          */
244         tupleDesc       = rel->rd_att;
245         numAttrs        = tupleDesc->natts;
246         att                     = tupleDesc->attrs;
247
248         /* ----------
249          * Then collect information about the values given
250          *
251          * NOTE: toast_action[i] can have these values:
252          *              ' '             default handling
253          *              'p'             already processed --- don't touch it
254          *              'x'             incompressible, but OK to move off
255          * ----------
256          */
257         memset(toast_action,    ' ', numAttrs * sizeof(char));
258         memset(toast_nulls,     ' ', numAttrs * sizeof(char));
259         memset(toast_free,      0,   numAttrs * sizeof(bool));
260         memset(toast_delold,    0,   numAttrs * sizeof(bool));
261         for (i = 0; i < numAttrs; i++)
262         {
263                 varattrib          *old_value;
264                 varattrib          *new_value;
265
266                 if (oldtup != NULL)
267                 {
268                         /* ----------
269                          * For UPDATE get the old and new values of this attribute
270                          * ----------
271                          */
272                         old_value = (varattrib *)DatumGetPointer(
273                                                 heap_getattr(oldtup, i + 1, tupleDesc, &old_isnull));
274                         toast_values[i] = 
275                                                 heap_getattr(newtup, i + 1, tupleDesc, &new_isnull);
276                         new_value = (varattrib *)DatumGetPointer(toast_values[i]);
277
278                         /* ----------
279                          * If the old value is an external stored one, check if it
280                          * has changed so we have to delete it later.
281                          * ----------
282                          */
283                         if (!old_isnull && att[i]->attlen == -1 && 
284                                                 VARATT_IS_EXTERNAL(old_value))
285                         {
286                                 if (new_isnull || !VARATT_IS_EXTERNAL(new_value) ||
287                                                 old_value->va_content.va_external.va_rowid !=
288                                                 new_value->va_content.va_external.va_rowid ||
289                                                 old_value->va_content.va_external.va_attno !=
290                                                 new_value->va_content.va_external.va_attno)
291                                 {
292                                         /* ----------
293                                          * The old external store value isn't needed any
294                                          * more after the update
295                                          * ----------
296                                          */
297                                         toast_delold[i] = true;
298                                         need_delold = true;
299                                 }
300                                 else
301                                 {
302                                         /* ----------
303                                          * This attribute isn't changed by this update
304                                          * so we reuse the original reference to the old
305                                          * value in the new tuple.
306                                          * ----------
307                                          */
308                                         toast_action[i] = 'p';
309                                         toast_sizes[i] = VARATT_SIZE(toast_values[i]);
310                                         continue;
311                                 }
312                         }
313                 }
314                 else
315                 {
316                         /* ----------
317                          * For INSERT simply get the new value
318                          * ----------
319                          */
320                         toast_values[i] = 
321                                                 heap_getattr(newtup, i + 1, tupleDesc, &new_isnull);
322                 }
323
324                 /* ----------
325                  * Handle NULL attributes
326                  * ----------
327                  */
328                 if (new_isnull)
329                 {
330                         toast_action[i] = 'p';
331                         toast_nulls[i] = 'n';
332                         has_nulls = true;
333                         continue;
334                 }
335
336                 /* ----------
337                  * Now look at varsize attributes
338                  * ----------
339                  */
340                 if (att[i]->attlen == -1)
341                 {
342                         /* ----------
343                          * If the table's attribute says PLAIN always, force it so.
344                          * ----------
345                          */
346                         if (att[i]->attstorage == 'p')
347                                 toast_action[i] = 'p';
348
349                         /* ----------
350                          * We took care of UPDATE above, so any TOASTed value we find
351                          * still in the tuple must be someone else's we cannot reuse.
352                          * Expand it to plain (and, probably, toast it again below).
353                          * ----------
354                          */
355                         if (VARATT_IS_EXTENDED(DatumGetPointer(toast_values[i])))
356                         {
357                                 toast_values[i] = PointerGetDatum(heap_tuple_untoast_attr(
358                                         (varattrib *)DatumGetPointer(toast_values[i])));
359                                 toast_free[i] = true;
360                                 need_change = true;
361                                 need_free = true;
362                         }
363
364                         /* ----------
365                          * Remember the size of this attribute
366                          * ----------
367                          */
368                         toast_sizes[i]  = VARATT_SIZE(DatumGetPointer(toast_values[i]));
369                 }
370                 else
371                 {
372                         /* ----------
373                          * Not a variable size attribute, plain storage always
374                          * ----------
375                          */
376                         toast_action[i] = 'p';
377                         toast_sizes[i]  = att[i]->attlen;
378                 }
379         }
380
381         /* ----------
382          * Compress and/or save external until data fits into target length
383          *
384          *      1: Inline compress attributes with attstorage 'x'
385          *      2: Store attributes with attstorage 'x' or 'e' external
386          *  3: Inline compress attributes with attstorage 'm'
387          *      4: Store attributes with attstorage 'm' external
388          * ----------
389          */
390         maxDataLen = offsetof(HeapTupleHeaderData, t_bits);
391         if (has_nulls)
392                 maxDataLen += BITMAPLEN(numAttrs);
393         maxDataLen = TOAST_TUPLE_TARGET - MAXALIGN(maxDataLen);
394
395         /* ----------
396          * Look for attributes with attstorage 'x' to compress
397          * ----------
398          */
399         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
400                                 maxDataLen)
401         {
402                 int             biggest_attno = -1;
403                 int32   biggest_size  = MAXALIGN(sizeof(varattrib));
404                 Datum   old_value;
405                 Datum   new_value;
406
407                 /* ----------
408                  * Search for the biggest yet uncompressed internal attribute
409                  * ----------
410                  */
411                 for (i = 0; i < numAttrs; i++)
412                 {
413                         if (toast_action[i] != ' ')
414                                 continue;
415                         if (VARATT_IS_EXTENDED(toast_values[i]))
416                                 continue;
417                         if (att[i]->attstorage != 'x')
418                                 continue;
419                         if (toast_sizes[i] > biggest_size)
420                         {
421                                 biggest_attno = i;
422                                 biggest_size  = toast_sizes[i];
423                         }
424                 }
425
426                 if (biggest_attno < 0)
427                         break;
428
429                 /* ----------
430                  * Attempt to compress it inline
431                  * ----------
432                  */
433                 i                                       = biggest_attno;
434                 old_value                       = toast_values[i];
435                 new_value                       = toast_compress_datum(old_value);
436
437                 if (DatumGetPointer(new_value) != NULL)
438                 {
439                         /* successful compression */
440                         if (toast_free[i])
441                                 pfree(DatumGetPointer(old_value));
442                         toast_values[i] = new_value;
443                         toast_free[i]   = true;
444                         toast_sizes[i]  = VARATT_SIZE(toast_values[i]);
445                         need_change             = true;
446                         need_free               = true;
447                 }
448                 else
449                 {
450                         /* incompressible data, ignore on subsequent compression passes */
451                         toast_action[i] = 'x';
452                 }
453         }
454
455         /* ----------
456          * Second we look for attributes of attstorage 'x' or 'e' that
457          * are still inline.
458          * ----------
459          */
460         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
461                                 maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
462         {
463                 int             biggest_attno = -1;
464                 int32   biggest_size  = MAXALIGN(sizeof(varattrib));
465                 Datum   old_value;
466
467                 /* ----------
468                  * Search for the biggest yet inlined attribute with
469                  * attstorage = 'x' or 'e'
470                  * ----------
471                  */
472                 for (i = 0; i < numAttrs; i++)
473                 {
474                         if (toast_action[i] == 'p')
475                                 continue;
476                         if (VARATT_IS_EXTERNAL(toast_values[i]))
477                                 continue;
478                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
479                                 continue;
480                         if (toast_sizes[i] > biggest_size)
481                         {
482                                 biggest_attno = i;
483                                 biggest_size  = toast_sizes[i];
484                         }
485                 }
486
487                 if (biggest_attno < 0)
488                         break;
489
490                 /* ----------
491                  * Store this external
492                  * ----------
493                  */
494                 i                                       = biggest_attno;
495                 old_value                       = toast_values[i];
496                 toast_action[i]         = 'p';
497                 toast_values[i]         = toast_save_datum(rel,
498                                                                         newtup->t_data->t_oid,
499                                                                         i + 1,
500                                                                         toast_values[i]);
501                 if (toast_free[i])
502                         pfree(DatumGetPointer(old_value));
503
504                 toast_free[i]           = true;
505                 toast_sizes[i]          = VARATT_SIZE(toast_values[i]);
506
507                 need_change = true;
508                 need_free   = true;
509         }
510
511         /* ----------
512          * Round 3 - this time we take attributes with storage
513          * 'm' into compression
514          * ----------
515          */
516         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
517                                 maxDataLen)
518         {
519                 int             biggest_attno = -1;
520                 int32   biggest_size  = MAXALIGN(sizeof(varattrib));
521                 Datum   old_value;
522                 Datum   new_value;
523
524                 /* ----------
525                  * Search for the biggest yet uncompressed internal attribute
526                  * ----------
527                  */
528                 for (i = 0; i < numAttrs; i++)
529                 {
530                         if (toast_action[i] != ' ')
531                                 continue;
532                         if (VARATT_IS_EXTENDED(toast_values[i]))
533                                 continue;
534                         if (att[i]->attstorage != 'm')
535                                 continue;
536                         if (toast_sizes[i] > biggest_size)
537                         {
538                                 biggest_attno = i;
539                                 biggest_size  = toast_sizes[i];
540                         }
541                 }
542
543                 if (biggest_attno < 0)
544                         break;
545
546                 /* ----------
547                  * Attempt to compress it inline
548                  * ----------
549                  */
550                 i                                       = biggest_attno;
551                 old_value                       = toast_values[i];
552                 new_value                       = toast_compress_datum(old_value);
553
554                 if (DatumGetPointer(new_value) != NULL)
555                 {
556                         /* successful compression */
557                         if (toast_free[i])
558                                 pfree(DatumGetPointer(old_value));
559                         toast_values[i] = new_value;
560                         toast_free[i]   = true;
561                         toast_sizes[i]  = VARATT_SIZE(toast_values[i]);
562                         need_change             = true;
563                         need_free               = true;
564                 }
565                 else
566                 {
567                         /* incompressible data, ignore on subsequent compression passes */
568                         toast_action[i] = 'x';
569                 }
570         }
571
572         /* ----------
573          * Finally we store attributes of type 'm' external
574          * ----------
575          */
576         while (MAXALIGN(ComputeDataSize(tupleDesc, toast_values, toast_nulls)) >
577                                 maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
578         {
579                 int             biggest_attno = -1;
580                 int32   biggest_size  = MAXALIGN(sizeof(varattrib));
581                 Datum   old_value;
582
583                 /* ----------
584                  * Search for the biggest yet inlined attribute with
585                  * attstorage = 'm'
586                  * ----------
587                  */
588                 for (i = 0; i < numAttrs; i++)
589                 {
590                         if (toast_action[i] == 'p')
591                                 continue;
592                         if (VARATT_IS_EXTERNAL(toast_values[i]))
593                                 continue;
594                         if (att[i]->attstorage != 'm')
595                                 continue;
596                         if (toast_sizes[i] > biggest_size)
597                         {
598                                 biggest_attno = i;
599                                 biggest_size  = toast_sizes[i];
600                         }
601                 }
602
603                 if (biggest_attno < 0)
604                         break;
605
606                 /* ----------
607                  * Store this external
608                  * ----------
609                  */
610                 i                                       = biggest_attno;
611                 old_value                       = toast_values[i];
612                 toast_action[i]         = 'p';
613                 toast_values[i]         = toast_save_datum(rel,
614                                                                         newtup->t_data->t_oid,
615                                                                         i + 1,
616                                                                         toast_values[i]);
617                 if (toast_free[i])
618                         pfree(DatumGetPointer(old_value));
619
620                 toast_free[i]           = true;
621                 toast_sizes[i]          = VARATT_SIZE(toast_values[i]);
622
623                 need_change = true;
624                 need_free   = true;
625         }
626
627         /* ----------
628          * In the case we toasted any values, we need to build
629          * a new heap tuple with the changed values.
630          * ----------
631          */
632         if (need_change)
633         {
634                 char               *new_data;
635                 int32                   new_len;
636                 MemoryContext   oldcxt;
637                 HeapTupleHeader olddata;
638
639                 /* ----------
640                  * Calculate the new size of the tuple
641                  * ----------
642                  */
643                 new_len = offsetof(HeapTupleHeaderData, t_bits);
644                 if (has_nulls)
645                         new_len += BITMAPLEN(numAttrs);
646                 new_len = MAXALIGN(new_len);
647                 new_len += ComputeDataSize(tupleDesc, toast_values, toast_nulls);
648
649                 /* ----------
650                  * Remember the old memory location of the tuple (for below),
651                  * switch to the memory context of the HeapTuple structure
652                  * and allocate the new tuple.
653                  * ----------
654                  */
655                 olddata = newtup->t_data;
656                 oldcxt = MemoryContextSwitchTo(newtup->t_datamcxt);
657                 new_data = palloc(new_len);
658
659                 /* ----------
660                  * Put the tuple header and the changed values into place
661                  * ----------
662                  */
663                 memcpy(new_data, newtup->t_data, newtup->t_data->t_hoff);
664                 newtup->t_data = (HeapTupleHeader)new_data;
665                 newtup->t_len = new_len;
666
667                 DataFill((char *)(MAXALIGN((long)new_data +
668                                                 offsetof(HeapTupleHeaderData, t_bits) + 
669                                                 ((has_nulls) ? BITMAPLEN(numAttrs) : 0))),
670                                 tupleDesc,
671                                 toast_values,
672                                 toast_nulls,
673                                 &(newtup->t_data->t_infomask),
674                                 has_nulls ? newtup->t_data->t_bits : NULL);
675
676                 /* ----------
677                  * In the case we modified a previously modified tuple again,
678                  * free the memory from the previous run
679                  * ----------
680                  */
681                 if ((char *)olddata != ((char *)newtup + HEAPTUPLESIZE))
682                         pfree(olddata);
683
684                 /* ----------
685                  * Switch back to the old memory context
686                  * ----------
687                  */
688                 MemoryContextSwitchTo(oldcxt);
689         }
690
691         /* ----------
692          * Free allocated temp values
693          * ----------
694          */
695         if (need_free)
696                 for (i = 0; i < numAttrs; i++)
697                         if (toast_free[i])
698                                 pfree(DatumGetPointer(toast_values[i]));
699
700         /* ----------
701          * Delete external values from the old tuple
702          * ----------
703          */
704         if (need_delold)
705                 for (i = 0; i < numAttrs; i++)
706                         if (toast_delold[i])
707                                 toast_delete_datum(rel,
708                                         heap_getattr(oldtup, i + 1, tupleDesc, &old_isnull));
709 }
710
711
712 /* ----------
713  * toast_compress_datum -
714  *
715  *      Create a compressed version of a varlena datum
716  *
717  *      If we fail (ie, compressed result is actually bigger than original)
718  *      then return NULL.  We must not use compressed data if it'd expand
719  *      the tuple!
720  * ----------
721  */
722 static Datum
723 toast_compress_datum(Datum value)
724 {
725         varattrib          *tmp;
726
727         tmp = (varattrib *) palloc(sizeof(PGLZ_Header) + VARATT_SIZE(value));
728         pglz_compress(VARATT_DATA(value), VARATT_SIZE(value) - VARHDRSZ,
729                                   (PGLZ_Header *) tmp,
730                                   PGLZ_strategy_default);
731         if (VARATT_SIZE(tmp) < VARATT_SIZE(value))
732         {
733                 /* successful compression */
734                 VARATT_SIZEP(tmp) |= VARATT_FLAG_COMPRESSED;
735                 return PointerGetDatum(tmp);
736         }
737         else
738         {
739                 /* incompressible data */
740                 pfree(tmp);
741                 return PointerGetDatum(NULL);
742         }
743 }
744
745
746 /* ----------
747  * toast_save_datum -
748  *
749  *      Save one single datum into the secondary relation and return
750  *      a varattrib reference for it.
751  * ----------
752  */
753 static Datum
754 toast_save_datum(Relation rel, Oid mainoid, int16 attno, Datum value)
755 {
756         Relation                        toastrel;
757         Relation                        toastidx;
758         HeapTuple                       toasttup;
759         InsertIndexResult       idxres;
760         TupleDesc                       toasttupDesc;
761         Datum                           t_values[3];
762         char                            t_nulls[3];
763         varattrib                  *result;
764         char                            chunk_data[VARHDRSZ + TOAST_MAX_CHUNK_SIZE];
765         int32                           chunk_size;
766         int32                           chunk_seq = 0;
767         char                       *data_p;
768         int32                           data_todo;
769
770         /* ----------
771          * Create the varattrib reference
772          * ----------
773          */
774         result = (varattrib *)palloc(sizeof(varattrib));
775
776         result->va_header       = sizeof(varattrib) | VARATT_FLAG_EXTERNAL;
777         if (VARATT_IS_COMPRESSED(value))
778         {
779                 result->va_header |= VARATT_FLAG_COMPRESSED;
780                 result->va_content.va_external.va_rawsize = 
781                                         ((varattrib *)value)->va_content.va_compressed.va_rawsize;
782         }
783         else
784                 result->va_content.va_external.va_rawsize = VARATT_SIZE(value);
785                                         
786         result->va_content.va_external.va_extsize               = 
787                                         VARATT_SIZE(value) - VARHDRSZ;
788         result->va_content.va_external.va_valueid               = newoid();
789         result->va_content.va_external.va_toastrelid    = 
790                                         rel->rd_rel->reltoastrelid;
791         result->va_content.va_external.va_toastidxid    = 
792                                         rel->rd_rel->reltoastidxid;
793         result->va_content.va_external.va_rowid                 = mainoid;
794         result->va_content.va_external.va_attno                 = attno;
795
796         /* ----------
797          * Initialize constant parts of the tuple data
798          * ----------
799          */
800         t_values[0] = ObjectIdGetDatum(result->va_content.va_external.va_valueid);
801         t_values[2] = PointerGetDatum(chunk_data);
802         t_nulls[0] = ' ';
803         t_nulls[1] = ' ';
804         t_nulls[2] = ' ';
805
806         /* ----------
807          * Get the data to process
808          * ----------
809          */
810         data_p          = VARATT_DATA(value);
811         data_todo       = VARATT_SIZE(value) - VARHDRSZ;
812
813         /* ----------
814          * Open the toast relation
815          * ----------
816          */
817         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
818         toasttupDesc = toastrel->rd_att;
819         toastidx = index_open(rel->rd_rel->reltoastidxid);
820         
821         /* ----------
822          * Split up the item into chunks 
823          * ----------
824          */
825         while (data_todo > 0)
826         {
827                 /* ----------
828                  * Calculate the size of this chunk
829                  * ----------
830                  */
831                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
832
833                 /* ----------
834                  * Build a tuple
835                  * ----------
836                  */
837                 t_values[1] = Int32GetDatum(chunk_seq++);
838                 VARATT_SIZEP(chunk_data) = chunk_size + VARHDRSZ;
839                 memcpy(VARATT_DATA(chunk_data), data_p, chunk_size);
840                 toasttup = heap_formtuple(toasttupDesc, t_values, t_nulls);
841                 if (!HeapTupleIsValid(toasttup))
842                         elog(ERROR, "Failed to build TOAST tuple");
843
844                 /* ----------
845                  * Store it and create the index entry
846                  * ----------
847                  */
848                 heap_insert(toastrel, toasttup);
849                 idxres = index_insert(toastidx, t_values, t_nulls,
850                                                 &(toasttup->t_self),
851                                                 toastrel);
852                 if (idxres == NULL)
853                         elog(ERROR, "Failed to insert index entry for TOAST tuple");
854
855                 /* ----------
856                  * Free memory
857                  * ----------
858                  */
859                 heap_freetuple(toasttup);
860                 pfree(idxres);
861
862                 /* ----------
863                  * Move on to next chunk
864                  * ----------
865                  */
866                 data_todo -= chunk_size;
867                 data_p += chunk_size;
868         }
869
870         /* ----------
871          * Done - close toast relation and return the reference
872          * ----------
873          */
874         index_close(toastidx);
875         heap_close(toastrel, RowExclusiveLock);
876
877         return PointerGetDatum(result);
878 }
879
880
881 /* ----------
882  * toast_delete_datum -
883  *
884  *      Delete a single external stored value.
885  * ----------
886  */
887 static void
888 toast_delete_datum(Relation rel, Datum value)
889 {
890         register varattrib         *attr = (varattrib *)value;
891         Relation                                toastrel;
892         Relation                                toastidx;
893         ScanKeyData                             toastkey;
894         IndexScanDesc                   toastscan;
895         HeapTupleData                   toasttup;
896         RetrieveIndexResult             indexRes;
897         Buffer                                  buffer;
898
899         if (!VARATT_IS_EXTERNAL(attr))
900                 return;
901
902         /* ----------
903          * Open the toast relation and it's index
904          * ----------
905          */
906         toastrel        = heap_open(attr->va_content.va_external.va_toastrelid,
907                                         RowExclusiveLock);
908         toastidx = index_open(attr->va_content.va_external.va_toastidxid);
909
910         /* ----------
911          * Setup a scan key to fetch from the index by va_valueid
912          * ----------
913          */
914         ScanKeyEntryInitialize(&toastkey,
915                                         (bits16) 0, 
916                                         (AttrNumber) 1, 
917                                         (RegProcedure) F_OIDEQ, 
918                                         ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
919
920         /* ----------
921          * Read the chunks by index
922          * ----------
923          */
924         toastscan = index_beginscan(toastidx, false, 1, &toastkey);
925         while ((indexRes = index_getnext(toastscan, ForwardScanDirection)) != NULL)
926         {
927                 toasttup.t_self = indexRes->heap_iptr;
928                 heap_fetch(toastrel, SnapshotAny, &toasttup, &buffer);
929                 pfree(indexRes);
930
931                 if (!toasttup.t_data)
932                         continue;
933
934                 /* ----------
935                  * Have a chunk, delete it
936                  * ----------
937                  */
938                 heap_delete(toastrel, &toasttup.t_self, NULL);
939
940                 ReleaseBuffer(buffer);
941         }
942
943         /* ----------
944          * End scan and close relations
945          * ----------
946          */
947         index_endscan(toastscan);
948         index_close(toastidx);
949         heap_close(toastrel, RowExclusiveLock);
950 }
951
952
953 /* ----------
954  * toast_fetch_datum -
955  *
956  *      Reconstruct an in memory varattrib from the chunks saved
957  *      in the toast relation
958  * ----------
959  */
960 static varattrib *
961 toast_fetch_datum(varattrib *attr)
962 {
963         Relation                                toastrel;
964         Relation                                toastidx;
965         ScanKeyData                             toastkey;
966         IndexScanDesc                   toastscan;
967         HeapTupleData                   toasttup;
968         HeapTuple                               ttup;
969         TupleDesc                               toasttupDesc;
970         RetrieveIndexResult             indexRes;
971         Buffer                                  buffer;
972
973         varattrib                          *result;
974         int32                                   ressize;
975         int32                                   residx;
976         int                                             numchunks;
977         Pointer                                 chunk;
978         bool                                    isnull;
979         int32                                   chunksize;
980
981         char                               *chunks_found;
982         char                               *chunks_expected;
983
984         ressize = attr->va_content.va_external.va_extsize;
985     numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
986
987         chunks_found    = palloc(numchunks);
988         chunks_expected = palloc(numchunks);
989         memset(chunks_found,    0, numchunks);
990         memset(chunks_expected, 1, numchunks);
991
992         result = (varattrib *)palloc(ressize + VARHDRSZ);
993         VARATT_SIZEP(result) = ressize + VARHDRSZ;
994         if (VARATT_IS_COMPRESSED(attr))
995                 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
996
997         /* ----------
998          * Open the toast relation and it's index
999          * ----------
1000          */
1001         toastrel        = heap_open(attr->va_content.va_external.va_toastrelid,
1002                                         AccessShareLock);
1003         toasttupDesc = toastrel->rd_att;
1004         toastidx = index_open(attr->va_content.va_external.va_toastidxid);
1005
1006         /* ----------
1007          * Setup a scan key to fetch from the index by va_valueid
1008          * ----------
1009          */
1010         ScanKeyEntryInitialize(&toastkey,
1011                                         (bits16) 0, 
1012                                         (AttrNumber) 1, 
1013                                         (RegProcedure) F_OIDEQ, 
1014                                         ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1015
1016         /* ----------
1017          * Read the chunks by index
1018          *
1019          * Note we will not necessarily see the chunks in sequence-number order.
1020          * ----------
1021          */
1022         toastscan = index_beginscan(toastidx, false, 1, &toastkey);
1023         while ((indexRes = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1024         {
1025                 toasttup.t_self = indexRes->heap_iptr;
1026                 heap_fetch(toastrel, SnapshotAny, &toasttup, &buffer);
1027                 pfree(indexRes);
1028
1029                 if (toasttup.t_data == NULL)
1030                         continue;
1031                 ttup = &toasttup;
1032
1033                 /* ----------
1034                  * Have a chunk, extract the sequence number and the data
1035                  * ----------
1036                  */
1037                 residx = DatumGetInt32(heap_getattr(ttup, 2, toasttupDesc, &isnull));
1038                 Assert(!isnull);
1039                 chunk = DatumGetPointer(heap_getattr(ttup, 3, toasttupDesc, &isnull));
1040                 Assert(!isnull);
1041                 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1042
1043                 /* ----------
1044                  * Some checks on the data we've found
1045                  * ----------
1046                  */
1047                 if (residx < 0 || residx >= numchunks)
1048                         elog(ERROR, "unexpected chunk number %d for toast value %d",
1049                                  residx,
1050                                  attr->va_content.va_external.va_valueid);
1051                 if (residx < numchunks-1)
1052                 {
1053                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1054                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %d",
1055                                          chunksize, residx,
1056                                          attr->va_content.va_external.va_valueid);
1057                 }
1058                 else
1059                 {
1060                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1061                                 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %d",
1062                                          chunksize, residx,
1063                                          attr->va_content.va_external.va_valueid);
1064                 }
1065                 if (chunks_found[residx]++ > 0)
1066                         elog(ERROR, "chunk %d for toast value %d appears multiple times",
1067                                  residx,
1068                                  attr->va_content.va_external.va_valueid);
1069
1070                 /* ----------
1071                  * Copy the data into proper place in our result
1072                  * ----------
1073                  */
1074                 memcpy(((char *)VARATT_DATA(result)) + residx * TOAST_MAX_CHUNK_SIZE,
1075                            VARATT_DATA(chunk),
1076                            chunksize);
1077
1078                 ReleaseBuffer(buffer);
1079         }
1080
1081         /* ----------
1082          * Final checks that we successfully fetched the datum
1083          * ----------
1084          */
1085         if (memcmp(chunks_found, chunks_expected, numchunks) != 0)
1086                 elog(ERROR, "not all toast chunks found for value %d",
1087                                                 attr->va_content.va_external.va_valueid);
1088         pfree(chunks_expected);
1089         pfree(chunks_found);
1090
1091         /* ----------
1092          * End scan and close relations
1093          * ----------
1094          */
1095         index_endscan(toastscan);
1096         index_close(toastidx);
1097         heap_close(toastrel, AccessShareLock);
1098
1099         return result;
1100 }
1101
1102
1103 #endif   /* TUPLE_TOASTER_ACTIVE */