1 /*-------------------------------------------------------------------------
4 * Support routines for external and compressed storage of
5 * variable size attributes.
7 * Copyright (c) 2000-2006, PostgreSQL Global Development Group
11 * $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.60 2006/06/16 18:42:21 tgl Exp $
15 * toast_insert_or_update -
16 * Try to make a given tuple fit into one page by compressing
17 * or moving off attributes
20 * Reclaim toast storage when a tuple is deleted
22 * heap_tuple_untoast_attr -
23 * Fetch back a given value from the "secondary" relation
25 *-------------------------------------------------------------------------
33 #include "access/heapam.h"
34 #include "access/genam.h"
35 #include "access/tuptoaster.h"
36 #include "catalog/catalog.h"
37 #include "utils/rel.h"
38 #include "utils/builtins.h"
39 #include "utils/fmgroids.h"
40 #include "utils/pg_lzcompress.h"
41 #include "utils/typcache.h"
46 static void toast_delete_datum(Relation rel, Datum value);
47 static Datum toast_save_datum(Relation rel, Datum value);
48 static varattrib *toast_fetch_datum(varattrib *attr);
49 static varattrib *toast_fetch_datum_slice(varattrib *attr,
50 int32 sliceoffset, int32 length);
54 * heap_tuple_fetch_attr -
56 * Public entry point to get back a toasted value
57 * external storage (possibly still in compressed format).
61 heap_tuple_fetch_attr(varattrib *attr)
65 if (VARATT_IS_EXTERNAL(attr))
68 * This is an external stored plain value
70 result = toast_fetch_datum(attr);
75 * This is a plain value inside of the main tuple - why am I called?
85 * heap_tuple_untoast_attr -
87 * Public entry point to get back a toasted value from compression
88 * or external storage.
92 heap_tuple_untoast_attr(varattrib *attr)
96 if (VARATT_IS_EXTERNAL(attr))
98 if (VARATT_IS_COMPRESSED(attr))
101 * This is an external stored compressed value
102 * Fetch it from the toast heap and decompress.
107 tmp = toast_fetch_datum(attr);
108 result = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
110 VARATT_SIZEP(result) = attr->va_content.va_external.va_rawsize
112 pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(result));
119 * This is an external stored plain value
121 result = toast_fetch_datum(attr);
124 else if (VARATT_IS_COMPRESSED(attr))
127 * This is a compressed value inside of the main tuple
129 result = (varattrib *) palloc(attr->va_content.va_compressed.va_rawsize
131 VARATT_SIZEP(result) = attr->va_content.va_compressed.va_rawsize
133 pglz_decompress((PGLZ_Header *) attr, VARATT_DATA(result));
138 * This is a plain value inside of the main tuple - why am I called?
147 * heap_tuple_untoast_attr_slice -
149 * Public entry point to get back part of a toasted value
150 * from compression or external storage.
154 heap_tuple_untoast_attr_slice(varattrib *attr, int32 sliceoffset, int32 slicelength)
160 if (VARATT_IS_COMPRESSED(attr))
164 if (VARATT_IS_EXTERNAL(attr))
165 tmp = toast_fetch_datum(attr);
168 tmp = attr; /* compressed in main tuple */
171 preslice = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
173 VARATT_SIZEP(preslice) = attr->va_content.va_external.va_rawsize + VARHDRSZ;
174 pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(preslice));
182 if (VARATT_IS_EXTERNAL(attr))
185 return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
191 /* slicing of datum for compressed cases and plain value */
193 attrsize = VARSIZE(preslice) - VARHDRSZ;
194 if (sliceoffset >= attrsize)
200 if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
201 slicelength = attrsize - sliceoffset;
203 result = (varattrib *) palloc(slicelength + VARHDRSZ);
204 VARATT_SIZEP(result) = slicelength + VARHDRSZ;
206 memcpy(VARDATA(result), VARDATA(preslice) + sliceoffset, slicelength);
208 if (preslice != attr)
216 * toast_raw_datum_size -
218 * Return the raw (detoasted) size of a varlena datum
222 toast_raw_datum_size(Datum value)
224 varattrib *attr = (varattrib *) DatumGetPointer(value);
227 if (VARATT_IS_COMPRESSED(attr))
230 * va_rawsize shows the original data size, whether the datum is
233 result = attr->va_content.va_compressed.va_rawsize + VARHDRSZ;
235 else if (VARATT_IS_EXTERNAL(attr))
238 * an uncompressed external attribute has rawsize including the header
239 * (not too consistent!)
241 result = attr->va_content.va_external.va_rawsize;
245 /* plain untoasted datum */
246 result = VARSIZE(attr);
254 * Return the physical storage size (possibly compressed) of a varlena datum
258 toast_datum_size(Datum value)
260 varattrib *attr = (varattrib *) DatumGetPointer(value);
263 if (VARATT_IS_EXTERNAL(attr))
266 * Attribute is stored externally - return the extsize whether
267 * compressed or not. We do not count the size of the toast pointer
270 result = attr->va_content.va_external.va_extsize;
275 * Attribute is stored inline either compressed or not, just calculate
276 * the size of the datum in either case.
278 result = VARSIZE(attr);
287 * Cascaded delete toast-entries on DELETE
291 toast_delete(Relation rel, HeapTuple oldtup)
294 Form_pg_attribute *att;
297 Datum toast_values[MaxHeapAttributeNumber];
298 bool toast_isnull[MaxHeapAttributeNumber];
301 * Get the tuple descriptor and break down the tuple into fields.
303 * NOTE: it's debatable whether to use heap_deformtuple() here or just
304 * heap_getattr() only the varlena columns. The latter could win if there
305 * are few varlena columns and many non-varlena ones. However,
306 * heap_deformtuple costs only O(N) while the heap_getattr way would cost
307 * O(N^2) if there are many varlena columns, so it seems better to err on
308 * the side of linear cost. (We won't even be here unless there's at
309 * least one varlena column, by the way.)
311 tupleDesc = rel->rd_att;
312 att = tupleDesc->attrs;
313 numAttrs = tupleDesc->natts;
315 Assert(numAttrs <= MaxHeapAttributeNumber);
316 heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
319 * Check for external stored attributes and delete them from the secondary
322 for (i = 0; i < numAttrs; i++)
324 if (att[i]->attlen == -1)
326 Datum value = toast_values[i];
328 if (!toast_isnull[i] && VARATT_IS_EXTERNAL(value))
329 toast_delete_datum(rel, value);
336 * toast_insert_or_update -
338 * Delete no-longer-used toast-entries and create new ones to
339 * make the new tuple fit on INSERT or UPDATE
342 * newtup: the candidate new tuple to be inserted
343 * oldtup: the old row version for UPDATE, or NULL for INSERT
345 * either newtup if no toasting is needed, or a palloc'd modified tuple
346 * that is what should actually get stored
348 * NOTE: neither newtup nor oldtup will be modified. This is a change
349 * from the pre-8.1 API of this routine.
353 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup)
355 HeapTuple result_tuple;
357 Form_pg_attribute *att;
361 bool need_change = false;
362 bool need_free = false;
363 bool need_delold = false;
364 bool has_nulls = false;
368 char toast_action[MaxHeapAttributeNumber];
369 bool toast_isnull[MaxHeapAttributeNumber];
370 bool toast_oldisnull[MaxHeapAttributeNumber];
371 Datum toast_values[MaxHeapAttributeNumber];
372 Datum toast_oldvalues[MaxHeapAttributeNumber];
373 int32 toast_sizes[MaxHeapAttributeNumber];
374 bool toast_free[MaxHeapAttributeNumber];
375 bool toast_delold[MaxHeapAttributeNumber];
378 * Get the tuple descriptor and break down the tuple(s) into fields.
380 tupleDesc = rel->rd_att;
381 att = tupleDesc->attrs;
382 numAttrs = tupleDesc->natts;
384 Assert(numAttrs <= MaxHeapAttributeNumber);
385 heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
387 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
390 * Then collect information about the values given
392 * NOTE: toast_action[i] can have these values:
393 * ' ' default handling
394 * 'p' already processed --- don't touch it
395 * 'x' incompressible, but OK to move off
397 * NOTE: toast_sizes[i] is only made valid for varlena attributes with
398 * toast_action[i] different from 'p'.
401 memset(toast_action, ' ', numAttrs * sizeof(char));
402 memset(toast_free, 0, numAttrs * sizeof(bool));
403 memset(toast_delold, 0, numAttrs * sizeof(bool));
405 for (i = 0; i < numAttrs; i++)
407 varattrib *old_value;
408 varattrib *new_value;
413 * For UPDATE get the old and new values of this attribute
415 old_value = (varattrib *) DatumGetPointer(toast_oldvalues[i]);
416 new_value = (varattrib *) DatumGetPointer(toast_values[i]);
419 * If the old value is an external stored one, check if it has
420 * changed so we have to delete it later.
422 if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
423 VARATT_IS_EXTERNAL(old_value))
425 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
426 old_value->va_content.va_external.va_valueid !=
427 new_value->va_content.va_external.va_valueid ||
428 old_value->va_content.va_external.va_toastrelid !=
429 new_value->va_content.va_external.va_toastrelid)
432 * The old external stored value isn't needed any more
435 toast_delold[i] = true;
441 * This attribute isn't changed by this update so we reuse
442 * the original reference to the old value in the new
445 toast_action[i] = 'p';
446 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
454 * For INSERT simply get the new value
456 new_value = (varattrib *) DatumGetPointer(toast_values[i]);
460 * Handle NULL attributes
464 toast_action[i] = 'p';
470 * Now look at varlena attributes
472 if (att[i]->attlen == -1)
475 * If the table's attribute says PLAIN always, force it so.
477 if (att[i]->attstorage == 'p')
478 toast_action[i] = 'p';
481 * We took care of UPDATE above, so any external value we find
482 * still in the tuple must be someone else's we cannot reuse.
483 * Expand it to plain (and, probably, toast it again below).
485 if (VARATT_IS_EXTERNAL(new_value))
487 new_value = heap_tuple_untoast_attr(new_value);
488 toast_values[i] = PointerGetDatum(new_value);
489 toast_free[i] = true;
495 * Remember the size of this attribute
497 toast_sizes[i] = VARATT_SIZE(new_value);
502 * Not a varlena attribute, plain storage always
504 toast_action[i] = 'p';
509 * Compress and/or save external until data fits into target length
511 * 1: Inline compress attributes with attstorage 'x'
512 * 2: Store attributes with attstorage 'x' or 'e' external
513 * 3: Inline compress attributes with attstorage 'm'
514 * 4: Store attributes with attstorage 'm' external
517 maxDataLen = offsetof(HeapTupleHeaderData, t_bits);
519 maxDataLen += BITMAPLEN(numAttrs);
520 maxDataLen = TOAST_TUPLE_TARGET - MAXALIGN(maxDataLen);
523 * Look for attributes with attstorage 'x' to compress
525 while (MAXALIGN(heap_compute_data_size(tupleDesc,
526 toast_values, toast_isnull)) >
529 int biggest_attno = -1;
530 int32 biggest_size = MAXALIGN(sizeof(varattrib));
535 * Search for the biggest yet uncompressed internal attribute
537 for (i = 0; i < numAttrs; i++)
539 if (toast_action[i] != ' ')
541 if (VARATT_IS_EXTENDED(toast_values[i]))
543 if (att[i]->attstorage != 'x')
545 if (toast_sizes[i] > biggest_size)
548 biggest_size = toast_sizes[i];
552 if (biggest_attno < 0)
556 * Attempt to compress it inline
559 old_value = toast_values[i];
560 new_value = toast_compress_datum(old_value);
562 if (DatumGetPointer(new_value) != NULL)
564 /* successful compression */
566 pfree(DatumGetPointer(old_value));
567 toast_values[i] = new_value;
568 toast_free[i] = true;
569 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
576 * incompressible data, ignore on subsequent compression passes
578 toast_action[i] = 'x';
583 * Second we look for attributes of attstorage 'x' or 'e' that are still
586 while (MAXALIGN(heap_compute_data_size(tupleDesc,
587 toast_values, toast_isnull)) >
588 maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
590 int biggest_attno = -1;
591 int32 biggest_size = MAXALIGN(sizeof(varattrib));
595 * Search for the biggest yet inlined attribute with
596 * attstorage equals 'x' or 'e'
599 for (i = 0; i < numAttrs; i++)
601 if (toast_action[i] == 'p')
603 if (VARATT_IS_EXTERNAL(toast_values[i]))
605 if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
607 if (toast_sizes[i] > biggest_size)
610 biggest_size = toast_sizes[i];
614 if (biggest_attno < 0)
618 * Store this external
621 old_value = toast_values[i];
622 toast_action[i] = 'p';
623 toast_values[i] = toast_save_datum(rel, toast_values[i]);
625 pfree(DatumGetPointer(old_value));
627 toast_free[i] = true;
628 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
635 * Round 3 - this time we take attributes with storage 'm' into
638 while (MAXALIGN(heap_compute_data_size(tupleDesc,
639 toast_values, toast_isnull)) >
642 int biggest_attno = -1;
643 int32 biggest_size = MAXALIGN(sizeof(varattrib));
648 * Search for the biggest yet uncompressed internal attribute
650 for (i = 0; i < numAttrs; i++)
652 if (toast_action[i] != ' ')
654 if (VARATT_IS_EXTENDED(toast_values[i]))
656 if (att[i]->attstorage != 'm')
658 if (toast_sizes[i] > biggest_size)
661 biggest_size = toast_sizes[i];
665 if (biggest_attno < 0)
669 * Attempt to compress it inline
672 old_value = toast_values[i];
673 new_value = toast_compress_datum(old_value);
675 if (DatumGetPointer(new_value) != NULL)
677 /* successful compression */
679 pfree(DatumGetPointer(old_value));
680 toast_values[i] = new_value;
681 toast_free[i] = true;
682 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
689 * incompressible data, ignore on subsequent compression passes
691 toast_action[i] = 'x';
696 * Finally we store attributes of type 'm' external
698 while (MAXALIGN(heap_compute_data_size(tupleDesc,
699 toast_values, toast_isnull)) >
700 maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
702 int biggest_attno = -1;
703 int32 biggest_size = MAXALIGN(sizeof(varattrib));
707 * Search for the biggest yet inlined attribute with
711 for (i = 0; i < numAttrs; i++)
713 if (toast_action[i] == 'p')
715 if (VARATT_IS_EXTERNAL(toast_values[i]))
717 if (att[i]->attstorage != 'm')
719 if (toast_sizes[i] > biggest_size)
722 biggest_size = toast_sizes[i];
726 if (biggest_attno < 0)
730 * Store this external
733 old_value = toast_values[i];
734 toast_action[i] = 'p';
735 toast_values[i] = toast_save_datum(rel, toast_values[i]);
737 pfree(DatumGetPointer(old_value));
739 toast_free[i] = true;
740 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
747 * In the case we toasted any values, we need to build a new heap tuple
748 * with the changed values.
752 HeapTupleHeader olddata = newtup->t_data;
753 HeapTupleHeader new_data;
757 * Calculate the new size of the tuple. Header size should not
758 * change, but data size might.
760 new_len = offsetof(HeapTupleHeaderData, t_bits);
762 new_len += BITMAPLEN(numAttrs);
763 if (olddata->t_infomask & HEAP_HASOID)
764 new_len += sizeof(Oid);
765 new_len = MAXALIGN(new_len);
766 Assert(new_len == olddata->t_hoff);
767 new_len += heap_compute_data_size(tupleDesc,
768 toast_values, toast_isnull);
771 * Allocate and zero the space needed, and fill HeapTupleData fields.
773 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
774 result_tuple->t_len = new_len;
775 result_tuple->t_self = newtup->t_self;
776 result_tuple->t_tableOid = newtup->t_tableOid;
777 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
778 result_tuple->t_data = new_data;
781 * Put the existing tuple header and the changed values into place
783 memcpy(new_data, olddata, olddata->t_hoff);
785 heap_fill_tuple(tupleDesc,
788 (char *) new_data + olddata->t_hoff,
789 &(new_data->t_infomask),
790 has_nulls ? new_data->t_bits : NULL);
793 result_tuple = newtup;
796 * Free allocated temp values
799 for (i = 0; i < numAttrs; i++)
801 pfree(DatumGetPointer(toast_values[i]));
804 * Delete external values from the old tuple
807 for (i = 0; i < numAttrs; i++)
809 toast_delete_datum(rel, toast_oldvalues[i]);
816 * toast_flatten_tuple_attribute -
818 * If a Datum is of composite type, "flatten" it to contain no toasted fields.
819 * This must be invoked on any potentially-composite field that is to be
820 * inserted into a tuple. Doing this preserves the invariant that toasting
821 * goes only one level deep in a tuple.
825 toast_flatten_tuple_attribute(Datum value,
826 Oid typeId, int32 typeMod)
829 HeapTupleHeader olddata;
830 HeapTupleHeader new_data;
832 HeapTupleData tmptup;
833 Form_pg_attribute *att;
836 bool need_change = false;
837 bool has_nulls = false;
838 Datum toast_values[MaxTupleAttributeNumber];
839 bool toast_isnull[MaxTupleAttributeNumber];
840 bool toast_free[MaxTupleAttributeNumber];
843 * See if it's a composite type, and get the tupdesc if so.
845 tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
846 if (tupleDesc == NULL)
847 return value; /* not a composite type */
849 att = tupleDesc->attrs;
850 numAttrs = tupleDesc->natts;
853 * Break down the tuple into fields.
855 olddata = DatumGetHeapTupleHeader(value);
856 Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
857 Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
858 /* Build a temporary HeapTuple control structure */
859 tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
860 ItemPointerSetInvalid(&(tmptup.t_self));
861 tmptup.t_tableOid = InvalidOid;
862 tmptup.t_data = olddata;
864 Assert(numAttrs <= MaxTupleAttributeNumber);
865 heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
867 memset(toast_free, 0, numAttrs * sizeof(bool));
869 for (i = 0; i < numAttrs; i++)
872 * Look at non-null varlena attributes
876 else if (att[i]->attlen == -1)
878 varattrib *new_value;
880 new_value = (varattrib *) DatumGetPointer(toast_values[i]);
881 if (VARATT_IS_EXTENDED(new_value))
883 new_value = heap_tuple_untoast_attr(new_value);
884 toast_values[i] = PointerGetDatum(new_value);
885 toast_free[i] = true;
892 * If nothing to untoast, just return the original tuple.
896 ReleaseTupleDesc(tupleDesc);
901 * Calculate the new size of the tuple. Header size should not change,
902 * but data size might.
904 new_len = offsetof(HeapTupleHeaderData, t_bits);
906 new_len += BITMAPLEN(numAttrs);
907 if (olddata->t_infomask & HEAP_HASOID)
908 new_len += sizeof(Oid);
909 new_len = MAXALIGN(new_len);
910 Assert(new_len == olddata->t_hoff);
911 new_len += heap_compute_data_size(tupleDesc, toast_values, toast_isnull);
913 new_data = (HeapTupleHeader) palloc0(new_len);
916 * Put the tuple header and the changed values into place
918 memcpy(new_data, olddata, olddata->t_hoff);
920 HeapTupleHeaderSetDatumLength(new_data, new_len);
922 heap_fill_tuple(tupleDesc,
925 (char *) new_data + olddata->t_hoff,
926 &(new_data->t_infomask),
927 has_nulls ? new_data->t_bits : NULL);
930 * Free allocated temp values
932 for (i = 0; i < numAttrs; i++)
934 pfree(DatumGetPointer(toast_values[i]));
935 ReleaseTupleDesc(tupleDesc);
937 return PointerGetDatum(new_data);
942 * toast_compress_datum -
944 * Create a compressed version of a varlena datum
946 * If we fail (ie, compressed result is actually bigger than original)
947 * then return NULL. We must not use compressed data if it'd expand
952 toast_compress_datum(Datum value)
956 tmp = (varattrib *) palloc(sizeof(PGLZ_Header) + VARATT_SIZE(value));
957 pglz_compress(VARATT_DATA(value), VARATT_SIZE(value) - VARHDRSZ,
959 PGLZ_strategy_default);
960 if (VARATT_SIZE(tmp) < VARATT_SIZE(value))
962 /* successful compression */
963 VARATT_SIZEP(tmp) |= VARATT_FLAG_COMPRESSED;
964 return PointerGetDatum(tmp);
968 /* incompressible data */
970 return PointerGetDatum(NULL);
978 * Save one single datum into the secondary relation and return
979 * a varattrib reference for it.
983 toast_save_datum(Relation rel, Datum value)
988 TupleDesc toasttupDesc;
995 char data[TOAST_MAX_CHUNK_SIZE];
1003 * Open the toast relation and its index. We can use the index to check
1004 * uniqueness of the OID we assign to the toasted item, even though it has
1005 * additional columns besides OID.
1007 toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1008 toasttupDesc = toastrel->rd_att;
1009 toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1012 * Create the varattrib reference
1014 result = (varattrib *) palloc(sizeof(varattrib));
1016 result->va_header = sizeof(varattrib) | VARATT_FLAG_EXTERNAL;
1017 if (VARATT_IS_COMPRESSED(value))
1019 result->va_header |= VARATT_FLAG_COMPRESSED;
1020 result->va_content.va_external.va_rawsize =
1021 ((varattrib *) value)->va_content.va_compressed.va_rawsize;
1024 result->va_content.va_external.va_rawsize = VARATT_SIZE(value);
1026 result->va_content.va_external.va_extsize =
1027 VARATT_SIZE(value) - VARHDRSZ;
1028 result->va_content.va_external.va_valueid =
1029 GetNewOidWithIndex(toastrel, toastidx);
1030 result->va_content.va_external.va_toastrelid =
1031 rel->rd_rel->reltoastrelid;
1034 * Initialize constant parts of the tuple data
1036 t_values[0] = ObjectIdGetDatum(result->va_content.va_external.va_valueid);
1037 t_values[2] = PointerGetDatum(&chunk_data);
1038 t_isnull[0] = false;
1039 t_isnull[1] = false;
1040 t_isnull[2] = false;
1043 * Get the data to process
1045 data_p = VARATT_DATA(value);
1046 data_todo = VARATT_SIZE(value) - VARHDRSZ;
1049 * We must explicitly lock the toast index because we aren't using an
1052 LockRelation(toastidx, RowExclusiveLock);
1055 * Split up the item into chunks
1057 while (data_todo > 0)
1060 * Calculate the size of this chunk
1062 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1065 * Build a tuple and store it
1067 t_values[1] = Int32GetDatum(chunk_seq++);
1068 VARATT_SIZEP(&chunk_data) = chunk_size + VARHDRSZ;
1069 memcpy(VARATT_DATA(&chunk_data), data_p, chunk_size);
1070 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1071 if (!HeapTupleIsValid(toasttup))
1072 elog(ERROR, "failed to build TOAST tuple");
1074 simple_heap_insert(toastrel, toasttup);
1077 * Create the index entry. We cheat a little here by not using
1078 * FormIndexDatum: this relies on the knowledge that the index columns
1079 * are the same as the initial columns of the table.
1081 * Note also that there had better not be any user-created index on
1082 * the TOAST table, since we don't bother to update anything else.
1084 index_insert(toastidx, t_values, t_isnull,
1085 &(toasttup->t_self),
1086 toastrel, toastidx->rd_index->indisunique);
1091 heap_freetuple(toasttup);
1094 * Move on to next chunk
1096 data_todo -= chunk_size;
1097 data_p += chunk_size;
1101 * Done - close toast relation and return the reference
1103 UnlockRelation(toastidx, RowExclusiveLock);
1104 index_close(toastidx);
1105 heap_close(toastrel, RowExclusiveLock);
1107 return PointerGetDatum(result);
1112 * toast_delete_datum -
1114 * Delete a single external stored value.
1118 toast_delete_datum(Relation rel, Datum value)
1120 varattrib *attr = (varattrib *) DatumGetPointer(value);
1123 ScanKeyData toastkey;
1124 IndexScanDesc toastscan;
1127 if (!VARATT_IS_EXTERNAL(attr))
1131 * Open the toast relation and it's index
1133 toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1135 toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1138 * Setup a scan key to fetch from the index by va_valueid (we don't
1139 * particularly care whether we see them in sequence or not)
1141 ScanKeyInit(&toastkey,
1143 BTEqualStrategyNumber, F_OIDEQ,
1144 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1147 * Find the chunks by index
1149 toastscan = index_beginscan(toastrel, toastidx, true,
1150 SnapshotToast, 1, &toastkey);
1151 while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1154 * Have a chunk, delete it
1156 simple_heap_delete(toastrel, &toasttup->t_self);
1160 * End scan and close relations
1162 index_endscan(toastscan);
1163 index_close(toastidx);
1164 heap_close(toastrel, RowExclusiveLock);
1169 * toast_fetch_datum -
1171 * Reconstruct an in memory varattrib from the chunks saved
1172 * in the toast relation
1176 toast_fetch_datum(varattrib *attr)
1180 ScanKeyData toastkey;
1181 IndexScanDesc toastscan;
1183 TupleDesc toasttupDesc;
1193 ressize = attr->va_content.va_external.va_extsize;
1194 numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1196 result = (varattrib *) palloc(ressize + VARHDRSZ);
1197 VARATT_SIZEP(result) = ressize + VARHDRSZ;
1198 if (VARATT_IS_COMPRESSED(attr))
1199 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1202 * Open the toast relation and its index
1204 toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1206 toasttupDesc = toastrel->rd_att;
1207 toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1210 * Setup a scan key to fetch from the index by va_valueid
1212 ScanKeyInit(&toastkey,
1214 BTEqualStrategyNumber, F_OIDEQ,
1215 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1218 * Read the chunks by index
1220 * Note that because the index is actually on (valueid, chunkidx) we will
1221 * see the chunks in chunkidx order, even though we didn't explicitly ask
1226 toastscan = index_beginscan(toastrel, toastidx, true,
1227 SnapshotToast, 1, &toastkey);
1228 while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1231 * Have a chunk, extract the sequence number and the data
1233 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1235 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1237 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1240 * Some checks on the data we've found
1242 if (residx != nextidx)
1243 elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1245 attr->va_content.va_external.va_valueid);
1246 if (residx < numchunks - 1)
1248 if (chunksize != TOAST_MAX_CHUNK_SIZE)
1249 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1251 attr->va_content.va_external.va_valueid);
1253 else if (residx < numchunks)
1255 if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1256 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1258 attr->va_content.va_external.va_valueid);
1261 elog(ERROR, "unexpected chunk number %d for toast value %u",
1263 attr->va_content.va_external.va_valueid);
1266 * Copy the data into proper place in our result
1268 memcpy(((char *) VARATT_DATA(result)) + residx * TOAST_MAX_CHUNK_SIZE,
1276 * Final checks that we successfully fetched the datum
1278 if (nextidx != numchunks)
1279 elog(ERROR, "missing chunk number %d for toast value %u",
1281 attr->va_content.va_external.va_valueid);
1284 * End scan and close relations
1286 index_endscan(toastscan);
1287 index_close(toastidx);
1288 heap_close(toastrel, AccessShareLock);
1294 * toast_fetch_datum_slice -
1296 * Reconstruct a segment of a varattrib from the chunks saved
1297 * in the toast relation
1301 toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
1305 ScanKeyData toastkey[3];
1307 IndexScanDesc toastscan;
1309 TupleDesc toasttupDesc;
1326 attrsize = attr->va_content.va_external.va_extsize;
1327 totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1329 if (sliceoffset >= attrsize)
1335 if (((sliceoffset + length) > attrsize) || length < 0)
1336 length = attrsize - sliceoffset;
1338 result = (varattrib *) palloc(length + VARHDRSZ);
1339 VARATT_SIZEP(result) = length + VARHDRSZ;
1341 if (VARATT_IS_COMPRESSED(attr))
1342 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1345 return result; /* Can save a lot of work at this point! */
1347 startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1348 endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1349 numchunks = (endchunk - startchunk) + 1;
1351 startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1352 endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1355 * Open the toast relation and it's index
1357 toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1359 toasttupDesc = toastrel->rd_att;
1360 toastidx = index_open(toastrel->rd_rel->reltoastidxid);
1363 * Setup a scan key to fetch from the index. This is either two keys or
1364 * three depending on the number of chunks.
1366 ScanKeyInit(&toastkey[0],
1368 BTEqualStrategyNumber, F_OIDEQ,
1369 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1372 * Use equality condition for one chunk, a range condition otherwise:
1376 ScanKeyInit(&toastkey[1],
1378 BTEqualStrategyNumber, F_INT4EQ,
1379 Int32GetDatum(startchunk));
1384 ScanKeyInit(&toastkey[1],
1386 BTGreaterEqualStrategyNumber, F_INT4GE,
1387 Int32GetDatum(startchunk));
1388 ScanKeyInit(&toastkey[2],
1390 BTLessEqualStrategyNumber, F_INT4LE,
1391 Int32GetDatum(endchunk));
1396 * Read the chunks by index
1398 * The index is on (valueid, chunkidx) so they will come in order
1400 nextidx = startchunk;
1401 toastscan = index_beginscan(toastrel, toastidx, true,
1402 SnapshotToast, nscankeys, toastkey);
1403 while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1406 * Have a chunk, extract the sequence number and the data
1408 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1410 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1412 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1415 * Some checks on the data we've found
1417 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1418 elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1420 attr->va_content.va_external.va_valueid);
1421 if (residx < totalchunks - 1)
1423 if (chunksize != TOAST_MAX_CHUNK_SIZE)
1424 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1426 attr->va_content.va_external.va_valueid);
1430 if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1431 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1433 attr->va_content.va_external.va_valueid);
1437 * Copy the data into proper place in our result
1440 chcpyend = chunksize - 1;
1441 if (residx == startchunk)
1442 chcpystrt = startoffset;
1443 if (residx == endchunk)
1444 chcpyend = endoffset;
1446 memcpy(((char *) VARATT_DATA(result)) +
1447 (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1448 VARATT_DATA(chunk) + chcpystrt,
1449 (chcpyend - chcpystrt) + 1);
1455 * Final checks that we successfully fetched the datum
1457 if (nextidx != (endchunk + 1))
1458 elog(ERROR, "missing chunk number %d for toast value %u",
1460 attr->va_content.va_external.va_valueid);
1463 * End scan and close relations
1465 index_endscan(toastscan);
1466 index_close(toastidx);
1467 heap_close(toastrel, AccessShareLock);