1 /*-------------------------------------------------------------------------
4 * Support routines for external and compressed storage of
5 * variable size attributes.
7 * Copyright (c) 2000-2006, PostgreSQL Global Development Group
11 * $PostgreSQL: pgsql/src/backend/access/heap/tuptoaster.c,v 1.63 2006/07/31 20:08:59 tgl Exp $
15 * toast_insert_or_update -
16 * Try to make a given tuple fit into one page by compressing
17 * or moving off attributes
20 * Reclaim toast storage when a tuple is deleted
22 * heap_tuple_untoast_attr -
23 * Fetch back a given value from the "secondary" relation
25 *-------------------------------------------------------------------------
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "catalog/catalog.h"
37 #include "utils/fmgroids.h"
38 #include "utils/pg_lzcompress.h"
39 #include "utils/typcache.h"
44 static void toast_delete_datum(Relation rel, Datum value);
45 static Datum toast_save_datum(Relation rel, Datum value);
46 static varattrib *toast_fetch_datum(varattrib *attr);
47 static varattrib *toast_fetch_datum_slice(varattrib *attr,
48 int32 sliceoffset, int32 length);
52 * heap_tuple_fetch_attr -
54 * Public entry point to get back a toasted value
55 * external storage (possibly still in compressed format).
59 heap_tuple_fetch_attr(varattrib *attr)
63 if (VARATT_IS_EXTERNAL(attr))
66 * This is an external stored plain value
68 result = toast_fetch_datum(attr);
73 * This is a plain value inside of the main tuple - why am I called?
83 * heap_tuple_untoast_attr -
85 * Public entry point to get back a toasted value from compression
86 * or external storage.
90 heap_tuple_untoast_attr(varattrib *attr)
94 if (VARATT_IS_EXTERNAL(attr))
96 if (VARATT_IS_COMPRESSED(attr))
99 * This is an external stored compressed value
100 * Fetch it from the toast heap and decompress.
105 tmp = toast_fetch_datum(attr);
106 result = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
108 VARATT_SIZEP(result) = attr->va_content.va_external.va_rawsize
110 pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(result));
117 * This is an external stored plain value
119 result = toast_fetch_datum(attr);
122 else if (VARATT_IS_COMPRESSED(attr))
125 * This is a compressed value inside of the main tuple
127 result = (varattrib *) palloc(attr->va_content.va_compressed.va_rawsize
129 VARATT_SIZEP(result) = attr->va_content.va_compressed.va_rawsize
131 pglz_decompress((PGLZ_Header *) attr, VARATT_DATA(result));
136 * This is a plain value inside of the main tuple - why am I called?
145 * heap_tuple_untoast_attr_slice -
147 * Public entry point to get back part of a toasted value
148 * from compression or external storage.
152 heap_tuple_untoast_attr_slice(varattrib *attr, int32 sliceoffset, int32 slicelength)
158 if (VARATT_IS_COMPRESSED(attr))
162 if (VARATT_IS_EXTERNAL(attr))
163 tmp = toast_fetch_datum(attr);
166 tmp = attr; /* compressed in main tuple */
169 preslice = (varattrib *) palloc(attr->va_content.va_external.va_rawsize
171 VARATT_SIZEP(preslice) = attr->va_content.va_external.va_rawsize + VARHDRSZ;
172 pglz_decompress((PGLZ_Header *) tmp, VARATT_DATA(preslice));
180 if (VARATT_IS_EXTERNAL(attr))
183 return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
189 /* slicing of datum for compressed cases and plain value */
191 attrsize = VARSIZE(preslice) - VARHDRSZ;
192 if (sliceoffset >= attrsize)
198 if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
199 slicelength = attrsize - sliceoffset;
201 result = (varattrib *) palloc(slicelength + VARHDRSZ);
202 VARATT_SIZEP(result) = slicelength + VARHDRSZ;
204 memcpy(VARDATA(result), VARDATA(preslice) + sliceoffset, slicelength);
206 if (preslice != attr)
214 * toast_raw_datum_size -
216 * Return the raw (detoasted) size of a varlena datum
220 toast_raw_datum_size(Datum value)
222 varattrib *attr = (varattrib *) DatumGetPointer(value);
225 if (VARATT_IS_COMPRESSED(attr))
228 * va_rawsize shows the original data size, whether the datum is
231 result = attr->va_content.va_compressed.va_rawsize + VARHDRSZ;
233 else if (VARATT_IS_EXTERNAL(attr))
236 * an uncompressed external attribute has rawsize including the header
237 * (not too consistent!)
239 result = attr->va_content.va_external.va_rawsize;
243 /* plain untoasted datum */
244 result = VARSIZE(attr);
252 * Return the physical storage size (possibly compressed) of a varlena datum
256 toast_datum_size(Datum value)
258 varattrib *attr = (varattrib *) DatumGetPointer(value);
261 if (VARATT_IS_EXTERNAL(attr))
264 * Attribute is stored externally - return the extsize whether
265 * compressed or not. We do not count the size of the toast pointer
268 result = attr->va_content.va_external.va_extsize;
273 * Attribute is stored inline either compressed or not, just calculate
274 * the size of the datum in either case.
276 result = VARSIZE(attr);
285 * Cascaded delete toast-entries on DELETE
289 toast_delete(Relation rel, HeapTuple oldtup)
292 Form_pg_attribute *att;
295 Datum toast_values[MaxHeapAttributeNumber];
296 bool toast_isnull[MaxHeapAttributeNumber];
299 * Get the tuple descriptor and break down the tuple into fields.
301 * NOTE: it's debatable whether to use heap_deformtuple() here or just
302 * heap_getattr() only the varlena columns. The latter could win if there
303 * are few varlena columns and many non-varlena ones. However,
304 * heap_deformtuple costs only O(N) while the heap_getattr way would cost
305 * O(N^2) if there are many varlena columns, so it seems better to err on
306 * the side of linear cost. (We won't even be here unless there's at
307 * least one varlena column, by the way.)
309 tupleDesc = rel->rd_att;
310 att = tupleDesc->attrs;
311 numAttrs = tupleDesc->natts;
313 Assert(numAttrs <= MaxHeapAttributeNumber);
314 heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
317 * Check for external stored attributes and delete them from the secondary
320 for (i = 0; i < numAttrs; i++)
322 if (att[i]->attlen == -1)
324 Datum value = toast_values[i];
326 if (!toast_isnull[i] && VARATT_IS_EXTERNAL(value))
327 toast_delete_datum(rel, value);
334 * toast_insert_or_update -
336 * Delete no-longer-used toast-entries and create new ones to
337 * make the new tuple fit on INSERT or UPDATE
340 * newtup: the candidate new tuple to be inserted
341 * oldtup: the old row version for UPDATE, or NULL for INSERT
343 * either newtup if no toasting is needed, or a palloc'd modified tuple
344 * that is what should actually get stored
346 * NOTE: neither newtup nor oldtup will be modified. This is a change
347 * from the pre-8.1 API of this routine.
351 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup)
353 HeapTuple result_tuple;
355 Form_pg_attribute *att;
359 bool need_change = false;
360 bool need_free = false;
361 bool need_delold = false;
362 bool has_nulls = false;
366 char toast_action[MaxHeapAttributeNumber];
367 bool toast_isnull[MaxHeapAttributeNumber];
368 bool toast_oldisnull[MaxHeapAttributeNumber];
369 Datum toast_values[MaxHeapAttributeNumber];
370 Datum toast_oldvalues[MaxHeapAttributeNumber];
371 int32 toast_sizes[MaxHeapAttributeNumber];
372 bool toast_free[MaxHeapAttributeNumber];
373 bool toast_delold[MaxHeapAttributeNumber];
376 * Get the tuple descriptor and break down the tuple(s) into fields.
378 tupleDesc = rel->rd_att;
379 att = tupleDesc->attrs;
380 numAttrs = tupleDesc->natts;
382 Assert(numAttrs <= MaxHeapAttributeNumber);
383 heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
385 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
388 * Then collect information about the values given
390 * NOTE: toast_action[i] can have these values:
391 * ' ' default handling
392 * 'p' already processed --- don't touch it
393 * 'x' incompressible, but OK to move off
395 * NOTE: toast_sizes[i] is only made valid for varlena attributes with
396 * toast_action[i] different from 'p'.
399 memset(toast_action, ' ', numAttrs * sizeof(char));
400 memset(toast_free, 0, numAttrs * sizeof(bool));
401 memset(toast_delold, 0, numAttrs * sizeof(bool));
403 for (i = 0; i < numAttrs; i++)
405 varattrib *old_value;
406 varattrib *new_value;
411 * For UPDATE get the old and new values of this attribute
413 old_value = (varattrib *) DatumGetPointer(toast_oldvalues[i]);
414 new_value = (varattrib *) DatumGetPointer(toast_values[i]);
417 * If the old value is an external stored one, check if it has
418 * changed so we have to delete it later.
420 if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
421 VARATT_IS_EXTERNAL(old_value))
423 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
424 old_value->va_content.va_external.va_valueid !=
425 new_value->va_content.va_external.va_valueid ||
426 old_value->va_content.va_external.va_toastrelid !=
427 new_value->va_content.va_external.va_toastrelid)
430 * The old external stored value isn't needed any more
433 toast_delold[i] = true;
439 * This attribute isn't changed by this update so we reuse
440 * the original reference to the old value in the new
443 toast_action[i] = 'p';
444 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
452 * For INSERT simply get the new value
454 new_value = (varattrib *) DatumGetPointer(toast_values[i]);
458 * Handle NULL attributes
462 toast_action[i] = 'p';
468 * Now look at varlena attributes
470 if (att[i]->attlen == -1)
473 * If the table's attribute says PLAIN always, force it so.
475 if (att[i]->attstorage == 'p')
476 toast_action[i] = 'p';
479 * We took care of UPDATE above, so any external value we find
480 * still in the tuple must be someone else's we cannot reuse.
481 * Expand it to plain (and, probably, toast it again below).
483 if (VARATT_IS_EXTERNAL(new_value))
485 new_value = heap_tuple_untoast_attr(new_value);
486 toast_values[i] = PointerGetDatum(new_value);
487 toast_free[i] = true;
493 * Remember the size of this attribute
495 toast_sizes[i] = VARATT_SIZE(new_value);
500 * Not a varlena attribute, plain storage always
502 toast_action[i] = 'p';
507 * Compress and/or save external until data fits into target length
509 * 1: Inline compress attributes with attstorage 'x'
510 * 2: Store attributes with attstorage 'x' or 'e' external
511 * 3: Inline compress attributes with attstorage 'm'
512 * 4: Store attributes with attstorage 'm' external
515 maxDataLen = offsetof(HeapTupleHeaderData, t_bits);
517 maxDataLen += BITMAPLEN(numAttrs);
518 maxDataLen = TOAST_TUPLE_TARGET - MAXALIGN(maxDataLen);
521 * Look for attributes with attstorage 'x' to compress
523 while (MAXALIGN(heap_compute_data_size(tupleDesc,
524 toast_values, toast_isnull)) >
527 int biggest_attno = -1;
528 int32 biggest_size = MAXALIGN(sizeof(varattrib));
533 * Search for the biggest yet uncompressed internal attribute
535 for (i = 0; i < numAttrs; i++)
537 if (toast_action[i] != ' ')
539 if (VARATT_IS_EXTENDED(toast_values[i]))
541 if (att[i]->attstorage != 'x')
543 if (toast_sizes[i] > biggest_size)
546 biggest_size = toast_sizes[i];
550 if (biggest_attno < 0)
554 * Attempt to compress it inline
557 old_value = toast_values[i];
558 new_value = toast_compress_datum(old_value);
560 if (DatumGetPointer(new_value) != NULL)
562 /* successful compression */
564 pfree(DatumGetPointer(old_value));
565 toast_values[i] = new_value;
566 toast_free[i] = true;
567 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
574 * incompressible data, ignore on subsequent compression passes
576 toast_action[i] = 'x';
581 * Second we look for attributes of attstorage 'x' or 'e' that are still
584 while (MAXALIGN(heap_compute_data_size(tupleDesc,
585 toast_values, toast_isnull)) >
586 maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
588 int biggest_attno = -1;
589 int32 biggest_size = MAXALIGN(sizeof(varattrib));
593 * Search for the biggest yet inlined attribute with
594 * attstorage equals 'x' or 'e'
597 for (i = 0; i < numAttrs; i++)
599 if (toast_action[i] == 'p')
601 if (VARATT_IS_EXTERNAL(toast_values[i]))
603 if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
605 if (toast_sizes[i] > biggest_size)
608 biggest_size = toast_sizes[i];
612 if (biggest_attno < 0)
616 * Store this external
619 old_value = toast_values[i];
620 toast_action[i] = 'p';
621 toast_values[i] = toast_save_datum(rel, toast_values[i]);
623 pfree(DatumGetPointer(old_value));
625 toast_free[i] = true;
626 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
633 * Round 3 - this time we take attributes with storage 'm' into
636 while (MAXALIGN(heap_compute_data_size(tupleDesc,
637 toast_values, toast_isnull)) >
640 int biggest_attno = -1;
641 int32 biggest_size = MAXALIGN(sizeof(varattrib));
646 * Search for the biggest yet uncompressed internal attribute
648 for (i = 0; i < numAttrs; i++)
650 if (toast_action[i] != ' ')
652 if (VARATT_IS_EXTENDED(toast_values[i]))
654 if (att[i]->attstorage != 'm')
656 if (toast_sizes[i] > biggest_size)
659 biggest_size = toast_sizes[i];
663 if (biggest_attno < 0)
667 * Attempt to compress it inline
670 old_value = toast_values[i];
671 new_value = toast_compress_datum(old_value);
673 if (DatumGetPointer(new_value) != NULL)
675 /* successful compression */
677 pfree(DatumGetPointer(old_value));
678 toast_values[i] = new_value;
679 toast_free[i] = true;
680 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
687 * incompressible data, ignore on subsequent compression passes
689 toast_action[i] = 'x';
694 * Finally we store attributes of type 'm' external
696 while (MAXALIGN(heap_compute_data_size(tupleDesc,
697 toast_values, toast_isnull)) >
698 maxDataLen && rel->rd_rel->reltoastrelid != InvalidOid)
700 int biggest_attno = -1;
701 int32 biggest_size = MAXALIGN(sizeof(varattrib));
705 * Search for the biggest yet inlined attribute with
709 for (i = 0; i < numAttrs; i++)
711 if (toast_action[i] == 'p')
713 if (VARATT_IS_EXTERNAL(toast_values[i]))
715 if (att[i]->attstorage != 'm')
717 if (toast_sizes[i] > biggest_size)
720 biggest_size = toast_sizes[i];
724 if (biggest_attno < 0)
728 * Store this external
731 old_value = toast_values[i];
732 toast_action[i] = 'p';
733 toast_values[i] = toast_save_datum(rel, toast_values[i]);
735 pfree(DatumGetPointer(old_value));
737 toast_free[i] = true;
738 toast_sizes[i] = VARATT_SIZE(toast_values[i]);
745 * In the case we toasted any values, we need to build a new heap tuple
746 * with the changed values.
750 HeapTupleHeader olddata = newtup->t_data;
751 HeapTupleHeader new_data;
755 * Calculate the new size of the tuple. Header size should not
756 * change, but data size might.
758 new_len = offsetof(HeapTupleHeaderData, t_bits);
760 new_len += BITMAPLEN(numAttrs);
761 if (olddata->t_infomask & HEAP_HASOID)
762 new_len += sizeof(Oid);
763 new_len = MAXALIGN(new_len);
764 Assert(new_len == olddata->t_hoff);
765 new_len += heap_compute_data_size(tupleDesc,
766 toast_values, toast_isnull);
769 * Allocate and zero the space needed, and fill HeapTupleData fields.
771 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_len);
772 result_tuple->t_len = new_len;
773 result_tuple->t_self = newtup->t_self;
774 result_tuple->t_tableOid = newtup->t_tableOid;
775 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
776 result_tuple->t_data = new_data;
779 * Put the existing tuple header and the changed values into place
781 memcpy(new_data, olddata, olddata->t_hoff);
783 heap_fill_tuple(tupleDesc,
786 (char *) new_data + olddata->t_hoff,
787 &(new_data->t_infomask),
788 has_nulls ? new_data->t_bits : NULL);
791 result_tuple = newtup;
794 * Free allocated temp values
797 for (i = 0; i < numAttrs; i++)
799 pfree(DatumGetPointer(toast_values[i]));
802 * Delete external values from the old tuple
805 for (i = 0; i < numAttrs; i++)
807 toast_delete_datum(rel, toast_oldvalues[i]);
814 * toast_flatten_tuple_attribute -
816 * If a Datum is of composite type, "flatten" it to contain no toasted fields.
817 * This must be invoked on any potentially-composite field that is to be
818 * inserted into a tuple. Doing this preserves the invariant that toasting
819 * goes only one level deep in a tuple.
823 toast_flatten_tuple_attribute(Datum value,
824 Oid typeId, int32 typeMod)
827 HeapTupleHeader olddata;
828 HeapTupleHeader new_data;
830 HeapTupleData tmptup;
831 Form_pg_attribute *att;
834 bool need_change = false;
835 bool has_nulls = false;
836 Datum toast_values[MaxTupleAttributeNumber];
837 bool toast_isnull[MaxTupleAttributeNumber];
838 bool toast_free[MaxTupleAttributeNumber];
841 * See if it's a composite type, and get the tupdesc if so.
843 tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
844 if (tupleDesc == NULL)
845 return value; /* not a composite type */
847 att = tupleDesc->attrs;
848 numAttrs = tupleDesc->natts;
851 * Break down the tuple into fields.
853 olddata = DatumGetHeapTupleHeader(value);
854 Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
855 Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
856 /* Build a temporary HeapTuple control structure */
857 tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
858 ItemPointerSetInvalid(&(tmptup.t_self));
859 tmptup.t_tableOid = InvalidOid;
860 tmptup.t_data = olddata;
862 Assert(numAttrs <= MaxTupleAttributeNumber);
863 heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
865 memset(toast_free, 0, numAttrs * sizeof(bool));
867 for (i = 0; i < numAttrs; i++)
870 * Look at non-null varlena attributes
874 else if (att[i]->attlen == -1)
876 varattrib *new_value;
878 new_value = (varattrib *) DatumGetPointer(toast_values[i]);
879 if (VARATT_IS_EXTENDED(new_value))
881 new_value = heap_tuple_untoast_attr(new_value);
882 toast_values[i] = PointerGetDatum(new_value);
883 toast_free[i] = true;
890 * If nothing to untoast, just return the original tuple.
894 ReleaseTupleDesc(tupleDesc);
899 * Calculate the new size of the tuple. Header size should not change,
900 * but data size might.
902 new_len = offsetof(HeapTupleHeaderData, t_bits);
904 new_len += BITMAPLEN(numAttrs);
905 if (olddata->t_infomask & HEAP_HASOID)
906 new_len += sizeof(Oid);
907 new_len = MAXALIGN(new_len);
908 Assert(new_len == olddata->t_hoff);
909 new_len += heap_compute_data_size(tupleDesc, toast_values, toast_isnull);
911 new_data = (HeapTupleHeader) palloc0(new_len);
914 * Put the tuple header and the changed values into place
916 memcpy(new_data, olddata, olddata->t_hoff);
918 HeapTupleHeaderSetDatumLength(new_data, new_len);
920 heap_fill_tuple(tupleDesc,
923 (char *) new_data + olddata->t_hoff,
924 &(new_data->t_infomask),
925 has_nulls ? new_data->t_bits : NULL);
928 * Free allocated temp values
930 for (i = 0; i < numAttrs; i++)
932 pfree(DatumGetPointer(toast_values[i]));
933 ReleaseTupleDesc(tupleDesc);
935 return PointerGetDatum(new_data);
940 * toast_compress_datum -
942 * Create a compressed version of a varlena datum
944 * If we fail (ie, compressed result is actually bigger than original)
945 * then return NULL. We must not use compressed data if it'd expand
950 toast_compress_datum(Datum value)
954 tmp = (varattrib *) palloc(sizeof(PGLZ_Header) + VARATT_SIZE(value));
955 pglz_compress(VARATT_DATA(value), VARATT_SIZE(value) - VARHDRSZ,
957 PGLZ_strategy_default);
958 if (VARATT_SIZE(tmp) < VARATT_SIZE(value))
960 /* successful compression */
961 VARATT_SIZEP(tmp) |= VARATT_FLAG_COMPRESSED;
962 return PointerGetDatum(tmp);
966 /* incompressible data */
968 return PointerGetDatum(NULL);
976 * Save one single datum into the secondary relation and return
977 * a varattrib reference for it.
981 toast_save_datum(Relation rel, Datum value)
986 TupleDesc toasttupDesc;
993 char data[TOAST_MAX_CHUNK_SIZE];
1001 * Open the toast relation and its index. We can use the index to check
1002 * uniqueness of the OID we assign to the toasted item, even though it has
1003 * additional columns besides OID.
1005 toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1006 toasttupDesc = toastrel->rd_att;
1007 toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1010 * Create the varattrib reference
1012 result = (varattrib *) palloc(sizeof(varattrib));
1014 result->va_header = sizeof(varattrib) | VARATT_FLAG_EXTERNAL;
1015 if (VARATT_IS_COMPRESSED(value))
1017 result->va_header |= VARATT_FLAG_COMPRESSED;
1018 result->va_content.va_external.va_rawsize =
1019 ((varattrib *) value)->va_content.va_compressed.va_rawsize;
1022 result->va_content.va_external.va_rawsize = VARATT_SIZE(value);
1024 result->va_content.va_external.va_extsize =
1025 VARATT_SIZE(value) - VARHDRSZ;
1026 result->va_content.va_external.va_valueid =
1027 GetNewOidWithIndex(toastrel, toastidx);
1028 result->va_content.va_external.va_toastrelid =
1029 rel->rd_rel->reltoastrelid;
1032 * Initialize constant parts of the tuple data
1034 t_values[0] = ObjectIdGetDatum(result->va_content.va_external.va_valueid);
1035 t_values[2] = PointerGetDatum(&chunk_data);
1036 t_isnull[0] = false;
1037 t_isnull[1] = false;
1038 t_isnull[2] = false;
1041 * Get the data to process
1043 data_p = VARATT_DATA(value);
1044 data_todo = VARATT_SIZE(value) - VARHDRSZ;
1047 * Split up the item into chunks
1049 while (data_todo > 0)
1052 * Calculate the size of this chunk
1054 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1057 * Build a tuple and store it
1059 t_values[1] = Int32GetDatum(chunk_seq++);
1060 VARATT_SIZEP(&chunk_data) = chunk_size + VARHDRSZ;
1061 memcpy(VARATT_DATA(&chunk_data), data_p, chunk_size);
1062 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1063 if (!HeapTupleIsValid(toasttup))
1064 elog(ERROR, "failed to build TOAST tuple");
1066 simple_heap_insert(toastrel, toasttup);
1069 * Create the index entry. We cheat a little here by not using
1070 * FormIndexDatum: this relies on the knowledge that the index columns
1071 * are the same as the initial columns of the table.
1073 * Note also that there had better not be any user-created index on
1074 * the TOAST table, since we don't bother to update anything else.
1076 index_insert(toastidx, t_values, t_isnull,
1077 &(toasttup->t_self),
1078 toastrel, toastidx->rd_index->indisunique);
1083 heap_freetuple(toasttup);
1086 * Move on to next chunk
1088 data_todo -= chunk_size;
1089 data_p += chunk_size;
1093 * Done - close toast relation and return the reference
1095 index_close(toastidx, RowExclusiveLock);
1096 heap_close(toastrel, RowExclusiveLock);
1098 return PointerGetDatum(result);
1103 * toast_delete_datum -
1105 * Delete a single external stored value.
1109 toast_delete_datum(Relation rel, Datum value)
1111 varattrib *attr = (varattrib *) DatumGetPointer(value);
1114 ScanKeyData toastkey;
1115 IndexScanDesc toastscan;
1118 if (!VARATT_IS_EXTERNAL(attr))
1122 * Open the toast relation and it's index
1124 toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1126 toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
1129 * Setup a scan key to fetch from the index by va_valueid (we don't
1130 * particularly care whether we see them in sequence or not)
1132 ScanKeyInit(&toastkey,
1134 BTEqualStrategyNumber, F_OIDEQ,
1135 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1138 * Find the chunks by index
1140 toastscan = index_beginscan(toastrel, toastidx,
1141 SnapshotToast, 1, &toastkey);
1142 while ((toasttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1145 * Have a chunk, delete it
1147 simple_heap_delete(toastrel, &toasttup->t_self);
1151 * End scan and close relations
1153 index_endscan(toastscan);
1154 index_close(toastidx, RowExclusiveLock);
1155 heap_close(toastrel, RowExclusiveLock);
1160 * toast_fetch_datum -
1162 * Reconstruct an in memory varattrib from the chunks saved
1163 * in the toast relation
1167 toast_fetch_datum(varattrib *attr)
1171 ScanKeyData toastkey;
1172 IndexScanDesc toastscan;
1174 TupleDesc toasttupDesc;
1184 ressize = attr->va_content.va_external.va_extsize;
1185 numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1187 result = (varattrib *) palloc(ressize + VARHDRSZ);
1188 VARATT_SIZEP(result) = ressize + VARHDRSZ;
1189 if (VARATT_IS_COMPRESSED(attr))
1190 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1193 * Open the toast relation and its index
1195 toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1197 toasttupDesc = toastrel->rd_att;
1198 toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1201 * Setup a scan key to fetch from the index by va_valueid
1203 ScanKeyInit(&toastkey,
1205 BTEqualStrategyNumber, F_OIDEQ,
1206 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1209 * Read the chunks by index
1211 * Note that because the index is actually on (valueid, chunkidx) we will
1212 * see the chunks in chunkidx order, even though we didn't explicitly ask
1217 toastscan = index_beginscan(toastrel, toastidx,
1218 SnapshotToast, 1, &toastkey);
1219 while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1222 * Have a chunk, extract the sequence number and the data
1224 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1226 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1228 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1231 * Some checks on the data we've found
1233 if (residx != nextidx)
1234 elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1236 attr->va_content.va_external.va_valueid);
1237 if (residx < numchunks - 1)
1239 if (chunksize != TOAST_MAX_CHUNK_SIZE)
1240 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1242 attr->va_content.va_external.va_valueid);
1244 else if (residx < numchunks)
1246 if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1247 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1249 attr->va_content.va_external.va_valueid);
1252 elog(ERROR, "unexpected chunk number %d for toast value %u",
1254 attr->va_content.va_external.va_valueid);
1257 * Copy the data into proper place in our result
1259 memcpy(((char *) VARATT_DATA(result)) + residx * TOAST_MAX_CHUNK_SIZE,
1267 * Final checks that we successfully fetched the datum
1269 if (nextidx != numchunks)
1270 elog(ERROR, "missing chunk number %d for toast value %u",
1272 attr->va_content.va_external.va_valueid);
1275 * End scan and close relations
1277 index_endscan(toastscan);
1278 index_close(toastidx, AccessShareLock);
1279 heap_close(toastrel, AccessShareLock);
1285 * toast_fetch_datum_slice -
1287 * Reconstruct a segment of a varattrib from the chunks saved
1288 * in the toast relation
1292 toast_fetch_datum_slice(varattrib *attr, int32 sliceoffset, int32 length)
1296 ScanKeyData toastkey[3];
1298 IndexScanDesc toastscan;
1300 TupleDesc toasttupDesc;
1317 attrsize = attr->va_content.va_external.va_extsize;
1318 totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1320 if (sliceoffset >= attrsize)
1326 if (((sliceoffset + length) > attrsize) || length < 0)
1327 length = attrsize - sliceoffset;
1329 result = (varattrib *) palloc(length + VARHDRSZ);
1330 VARATT_SIZEP(result) = length + VARHDRSZ;
1332 if (VARATT_IS_COMPRESSED(attr))
1333 VARATT_SIZEP(result) |= VARATT_FLAG_COMPRESSED;
1336 return result; /* Can save a lot of work at this point! */
1338 startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1339 endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1340 numchunks = (endchunk - startchunk) + 1;
1342 startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1343 endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1346 * Open the toast relation and it's index
1348 toastrel = heap_open(attr->va_content.va_external.va_toastrelid,
1350 toasttupDesc = toastrel->rd_att;
1351 toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
1354 * Setup a scan key to fetch from the index. This is either two keys or
1355 * three depending on the number of chunks.
1357 ScanKeyInit(&toastkey[0],
1359 BTEqualStrategyNumber, F_OIDEQ,
1360 ObjectIdGetDatum(attr->va_content.va_external.va_valueid));
1363 * Use equality condition for one chunk, a range condition otherwise:
1367 ScanKeyInit(&toastkey[1],
1369 BTEqualStrategyNumber, F_INT4EQ,
1370 Int32GetDatum(startchunk));
1375 ScanKeyInit(&toastkey[1],
1377 BTGreaterEqualStrategyNumber, F_INT4GE,
1378 Int32GetDatum(startchunk));
1379 ScanKeyInit(&toastkey[2],
1381 BTLessEqualStrategyNumber, F_INT4LE,
1382 Int32GetDatum(endchunk));
1387 * Read the chunks by index
1389 * The index is on (valueid, chunkidx) so they will come in order
1391 nextidx = startchunk;
1392 toastscan = index_beginscan(toastrel, toastidx,
1393 SnapshotToast, nscankeys, toastkey);
1394 while ((ttup = index_getnext(toastscan, ForwardScanDirection)) != NULL)
1397 * Have a chunk, extract the sequence number and the data
1399 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1401 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1403 chunksize = VARATT_SIZE(chunk) - VARHDRSZ;
1406 * Some checks on the data we've found
1408 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
1409 elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u",
1411 attr->va_content.va_external.va_valueid);
1412 if (residx < totalchunks - 1)
1414 if (chunksize != TOAST_MAX_CHUNK_SIZE)
1415 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1417 attr->va_content.va_external.va_valueid);
1421 if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
1422 elog(ERROR, "unexpected chunk size %d in chunk %d for toast value %u",
1424 attr->va_content.va_external.va_valueid);
1428 * Copy the data into proper place in our result
1431 chcpyend = chunksize - 1;
1432 if (residx == startchunk)
1433 chcpystrt = startoffset;
1434 if (residx == endchunk)
1435 chcpyend = endoffset;
1437 memcpy(((char *) VARATT_DATA(result)) +
1438 (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
1439 VARATT_DATA(chunk) + chcpystrt,
1440 (chcpyend - chcpystrt) + 1);
1446 * Final checks that we successfully fetched the datum
1448 if (nextidx != (endchunk + 1))
1449 elog(ERROR, "missing chunk number %d for toast value %u",
1451 attr->va_content.va_external.va_valueid);
1454 * End scan and close relations
1456 index_endscan(toastscan);
1457 index_close(toastidx, AccessShareLock);
1458 heap_close(toastrel, AccessShareLock);