]> granicus.if.org Git - postgresql/blob - src/backend/access/heap/tuptoaster.c
Move pg_lzcompress.c to src/common.
[postgresql] / src / backend / access / heap / tuptoaster.c
1 /*-------------------------------------------------------------------------
2  *
3  * tuptoaster.c
4  *        Support routines for external and compressed storage of
5  *        variable size attributes.
6  *
7  * Copyright (c) 2000-2014, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/access/heap/tuptoaster.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              toast_insert_or_update -
16  *                      Try to make a given tuple fit into one page by compressing
17  *                      or moving off attributes
18  *
19  *              toast_delete -
20  *                      Reclaim toast storage when a tuple is deleted
21  *
22  *              heap_tuple_untoast_attr -
23  *                      Fetch back a given value from the "secondary" relation
24  *
25  *-------------------------------------------------------------------------
26  */
27
28 #include "postgres.h"
29
30 #include <unistd.h>
31 #include <fcntl.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "access/tuptoaster.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "miscadmin.h"
39 #include "utils/fmgroids.h"
40 #include "common/pg_lzcompress.h"
41 #include "utils/rel.h"
42 #include "utils/typcache.h"
43 #include "utils/tqual.h"
44
45
46 #undef TOAST_DEBUG
47
48 static void toast_delete_datum(Relation rel, Datum value);
49 static Datum toast_save_datum(Relation rel, Datum value,
50                                  struct varlena * oldexternal, int options);
51 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
52 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
53 static struct varlena *toast_fetch_datum(struct varlena * attr);
54 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
55                                                 int32 sliceoffset, int32 length);
56 static int toast_open_indexes(Relation toastrel,
57                                    LOCKMODE lock,
58                                    Relation **toastidxs,
59                                    int *num_indexes);
60 static void toast_close_indexes(Relation *toastidxs, int num_indexes,
61                                         LOCKMODE lock);
62
63
64 /* ----------
65  * heap_tuple_fetch_attr -
66  *
67  *      Public entry point to get back a toasted value from
68  *      external source (possibly still in compressed format).
69  *
70  * This will return a datum that contains all the data internally, ie, not
71  * relying on external storage or memory, but it can still be compressed or
72  * have a short header.
73  ----------
74  */
75 struct varlena *
76 heap_tuple_fetch_attr(struct varlena * attr)
77 {
78         struct varlena *result;
79
80         if (VARATT_IS_EXTERNAL_ONDISK(attr))
81         {
82                 /*
83                  * This is an external stored plain value
84                  */
85                 result = toast_fetch_datum(attr);
86         }
87         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
88         {
89                 /*
90                  * copy into the caller's memory context. That's not required in all
91                  * cases but sufficient for now since this is mainly used when we need
92                  * to persist a Datum for unusually long time, like in a HOLD cursor.
93                  */
94                 struct varatt_indirect redirect;
95
96                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
97                 attr = (struct varlena *) redirect.pointer;
98
99                 /* nested indirect Datums aren't allowed */
100                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
101
102                 /* doesn't make much sense, but better handle it */
103                 if (VARATT_IS_EXTERNAL_ONDISK(attr))
104                         return heap_tuple_fetch_attr(attr);
105
106                 /* copy datum verbatim */
107                 result = (struct varlena *) palloc(VARSIZE_ANY(attr));
108                 memcpy(result, attr, VARSIZE_ANY(attr));
109         }
110         else
111         {
112                 /*
113                  * This is a plain value inside of the main tuple - why am I called?
114                  */
115                 result = attr;
116         }
117
118         return result;
119 }
120
121
122 /* ----------
123  * heap_tuple_untoast_attr -
124  *
125  *      Public entry point to get back a toasted value from compression
126  *      or external storage.
127  * ----------
128  */
129 struct varlena *
130 heap_tuple_untoast_attr(struct varlena * attr)
131 {
132         if (VARATT_IS_EXTERNAL_ONDISK(attr))
133         {
134                 /*
135                  * This is an externally stored datum --- fetch it back from there
136                  */
137                 attr = toast_fetch_datum(attr);
138                 /* If it's compressed, decompress it */
139                 if (VARATT_IS_COMPRESSED(attr))
140                 {
141                         PGLZ_Header *tmp = (PGLZ_Header *) attr;
142
143                         attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
144                         SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
145                         if (!pglz_decompress(tmp, VARDATA(attr)))
146                                 elog(ERROR, "compressed data is corrupted");
147                         pfree(tmp);
148                 }
149         }
150         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
151         {
152                 struct varatt_indirect redirect;
153
154                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
155                 attr = (struct varlena *) redirect.pointer;
156
157                 /* nested indirect Datums aren't allowed */
158                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
159
160                 attr = heap_tuple_untoast_attr(attr);
161         }
162         else if (VARATT_IS_COMPRESSED(attr))
163         {
164                 /*
165                  * This is a compressed value inside of the main tuple
166                  */
167                 PGLZ_Header *tmp = (PGLZ_Header *) attr;
168
169                 attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
170                 SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
171                 if (!pglz_decompress(tmp, VARDATA(attr)))
172                         elog(ERROR, "compressed data is corrupted");
173         }
174         else if (VARATT_IS_SHORT(attr))
175         {
176                 /*
177                  * This is a short-header varlena --- convert to 4-byte header format
178                  */
179                 Size            data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
180                 Size            new_size = data_size + VARHDRSZ;
181                 struct varlena *new_attr;
182
183                 new_attr = (struct varlena *) palloc(new_size);
184                 SET_VARSIZE(new_attr, new_size);
185                 memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
186                 attr = new_attr;
187         }
188
189         return attr;
190 }
191
192
193 /* ----------
194  * heap_tuple_untoast_attr_slice -
195  *
196  *              Public entry point to get back part of a toasted value
197  *              from compression or external storage.
198  * ----------
199  */
200 struct varlena *
201 heap_tuple_untoast_attr_slice(struct varlena * attr,
202                                                           int32 sliceoffset, int32 slicelength)
203 {
204         struct varlena *preslice;
205         struct varlena *result;
206         char       *attrdata;
207         int32           attrsize;
208
209         if (VARATT_IS_EXTERNAL_ONDISK(attr))
210         {
211                 struct varatt_external toast_pointer;
212
213                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
214
215                 /* fast path for non-compressed external datums */
216                 if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
217                         return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
218
219                 /* fetch it back (compressed marker will get set automatically) */
220                 preslice = toast_fetch_datum(attr);
221         }
222         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
223         {
224                 struct varatt_indirect redirect;
225
226                 VARATT_EXTERNAL_GET_POINTER(redirect, attr);
227
228                 /* nested indirect Datums aren't allowed */
229                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
230
231                 return heap_tuple_untoast_attr_slice(redirect.pointer,
232                                                                                          sliceoffset, slicelength);
233         }
234         else
235                 preslice = attr;
236
237         if (VARATT_IS_COMPRESSED(preslice))
238         {
239                 PGLZ_Header *tmp = (PGLZ_Header *) preslice;
240                 Size            size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
241
242                 preslice = (struct varlena *) palloc(size);
243                 SET_VARSIZE(preslice, size);
244                 if (!pglz_decompress(tmp, VARDATA(preslice)))
245                         elog(ERROR, "compressed data is corrupted");
246
247                 if (tmp != (PGLZ_Header *) attr)
248                         pfree(tmp);
249         }
250
251         if (VARATT_IS_SHORT(preslice))
252         {
253                 attrdata = VARDATA_SHORT(preslice);
254                 attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
255         }
256         else
257         {
258                 attrdata = VARDATA(preslice);
259                 attrsize = VARSIZE(preslice) - VARHDRSZ;
260         }
261
262         /* slicing of datum for compressed cases and plain value */
263
264         if (sliceoffset >= attrsize)
265         {
266                 sliceoffset = 0;
267                 slicelength = 0;
268         }
269
270         if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
271                 slicelength = attrsize - sliceoffset;
272
273         result = (struct varlena *) palloc(slicelength + VARHDRSZ);
274         SET_VARSIZE(result, slicelength + VARHDRSZ);
275
276         memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
277
278         if (preslice != attr)
279                 pfree(preslice);
280
281         return result;
282 }
283
284
285 /* ----------
286  * toast_raw_datum_size -
287  *
288  *      Return the raw (detoasted) size of a varlena datum
289  *      (including the VARHDRSZ header)
290  * ----------
291  */
292 Size
293 toast_raw_datum_size(Datum value)
294 {
295         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
296         Size            result;
297
298         if (VARATT_IS_EXTERNAL_ONDISK(attr))
299         {
300                 /* va_rawsize is the size of the original datum -- including header */
301                 struct varatt_external toast_pointer;
302
303                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
304                 result = toast_pointer.va_rawsize;
305         }
306         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
307         {
308                 struct varatt_indirect toast_pointer;
309
310                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
311
312                 /* nested indirect Datums aren't allowed */
313                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
314
315                 return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
316         }
317         else if (VARATT_IS_COMPRESSED(attr))
318         {
319                 /* here, va_rawsize is just the payload size */
320                 result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
321         }
322         else if (VARATT_IS_SHORT(attr))
323         {
324                 /*
325                  * we have to normalize the header length to VARHDRSZ or else the
326                  * callers of this function will be confused.
327                  */
328                 result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
329         }
330         else
331         {
332                 /* plain untoasted datum */
333                 result = VARSIZE(attr);
334         }
335         return result;
336 }
337
338 /* ----------
339  * toast_datum_size
340  *
341  *      Return the physical storage size (possibly compressed) of a varlena datum
342  * ----------
343  */
344 Size
345 toast_datum_size(Datum value)
346 {
347         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
348         Size            result;
349
350         if (VARATT_IS_EXTERNAL_ONDISK(attr))
351         {
352                 /*
353                  * Attribute is stored externally - return the extsize whether
354                  * compressed or not.  We do not count the size of the toast pointer
355                  * ... should we?
356                  */
357                 struct varatt_external toast_pointer;
358
359                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
360                 result = toast_pointer.va_extsize;
361         }
362         else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
363         {
364                 struct varatt_indirect toast_pointer;
365
366                 VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
367
368                 /* nested indirect Datums aren't allowed */
369                 Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
370
371                 return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
372         }
373         else if (VARATT_IS_SHORT(attr))
374         {
375                 result = VARSIZE_SHORT(attr);
376         }
377         else
378         {
379                 /*
380                  * Attribute is stored inline either compressed or not, just calculate
381                  * the size of the datum in either case.
382                  */
383                 result = VARSIZE(attr);
384         }
385         return result;
386 }
387
388
389 /* ----------
390  * toast_delete -
391  *
392  *      Cascaded delete toast-entries on DELETE
393  * ----------
394  */
395 void
396 toast_delete(Relation rel, HeapTuple oldtup)
397 {
398         TupleDesc       tupleDesc;
399         Form_pg_attribute *att;
400         int                     numAttrs;
401         int                     i;
402         Datum           toast_values[MaxHeapAttributeNumber];
403         bool            toast_isnull[MaxHeapAttributeNumber];
404
405         /*
406          * We should only ever be called for tuples of plain relations or
407          * materialized views --- recursing on a toast rel is bad news.
408          */
409         Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
410                    rel->rd_rel->relkind == RELKIND_MATVIEW);
411
412         /*
413          * Get the tuple descriptor and break down the tuple into fields.
414          *
415          * NOTE: it's debatable whether to use heap_deform_tuple() here or just
416          * heap_getattr() only the varlena columns.  The latter could win if there
417          * are few varlena columns and many non-varlena ones. However,
418          * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
419          * O(N^2) if there are many varlena columns, so it seems better to err on
420          * the side of linear cost.  (We won't even be here unless there's at
421          * least one varlena column, by the way.)
422          */
423         tupleDesc = rel->rd_att;
424         att = tupleDesc->attrs;
425         numAttrs = tupleDesc->natts;
426
427         Assert(numAttrs <= MaxHeapAttributeNumber);
428         heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
429
430         /*
431          * Check for external stored attributes and delete them from the secondary
432          * relation.
433          */
434         for (i = 0; i < numAttrs; i++)
435         {
436                 if (att[i]->attlen == -1)
437                 {
438                         Datum           value = toast_values[i];
439
440                         if (toast_isnull[i])
441                                 continue;
442                         else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
443                                 toast_delete_datum(rel, value);
444                         else if (VARATT_IS_EXTERNAL_INDIRECT(PointerGetDatum(value)))
445                                 elog(ERROR, "attempt to delete tuple containing indirect datums");
446                 }
447         }
448 }
449
450
451 /* ----------
452  * toast_insert_or_update -
453  *
454  *      Delete no-longer-used toast-entries and create new ones to
455  *      make the new tuple fit on INSERT or UPDATE
456  *
457  * Inputs:
458  *      newtup: the candidate new tuple to be inserted
459  *      oldtup: the old row version for UPDATE, or NULL for INSERT
460  *      options: options to be passed to heap_insert() for toast rows
461  * Result:
462  *      either newtup if no toasting is needed, or a palloc'd modified tuple
463  *      that is what should actually get stored
464  *
465  * NOTE: neither newtup nor oldtup will be modified.  This is a change
466  * from the pre-8.1 API of this routine.
467  * ----------
468  */
469 HeapTuple
470 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
471                                            int options)
472 {
473         HeapTuple       result_tuple;
474         TupleDesc       tupleDesc;
475         Form_pg_attribute *att;
476         int                     numAttrs;
477         int                     i;
478
479         bool            need_change = false;
480         bool            need_free = false;
481         bool            need_delold = false;
482         bool            has_nulls = false;
483
484         Size            maxDataLen;
485         Size            hoff;
486
487         char            toast_action[MaxHeapAttributeNumber];
488         bool            toast_isnull[MaxHeapAttributeNumber];
489         bool            toast_oldisnull[MaxHeapAttributeNumber];
490         Datum           toast_values[MaxHeapAttributeNumber];
491         Datum           toast_oldvalues[MaxHeapAttributeNumber];
492         struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
493         int32           toast_sizes[MaxHeapAttributeNumber];
494         bool            toast_free[MaxHeapAttributeNumber];
495         bool            toast_delold[MaxHeapAttributeNumber];
496
497         /*
498          * We should only ever be called for tuples of plain relations or
499          * materialized views --- recursing on a toast rel is bad news.
500          */
501         Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
502                    rel->rd_rel->relkind == RELKIND_MATVIEW);
503
504         /*
505          * Get the tuple descriptor and break down the tuple(s) into fields.
506          */
507         tupleDesc = rel->rd_att;
508         att = tupleDesc->attrs;
509         numAttrs = tupleDesc->natts;
510
511         Assert(numAttrs <= MaxHeapAttributeNumber);
512         heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
513         if (oldtup != NULL)
514                 heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
515
516         /* ----------
517          * Then collect information about the values given
518          *
519          * NOTE: toast_action[i] can have these values:
520          *              ' '             default handling
521          *              'p'             already processed --- don't touch it
522          *              'x'             incompressible, but OK to move off
523          *
524          * NOTE: toast_sizes[i] is only made valid for varlena attributes with
525          *              toast_action[i] different from 'p'.
526          * ----------
527          */
528         memset(toast_action, ' ', numAttrs * sizeof(char));
529         memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
530         memset(toast_free, 0, numAttrs * sizeof(bool));
531         memset(toast_delold, 0, numAttrs * sizeof(bool));
532
533         for (i = 0; i < numAttrs; i++)
534         {
535                 struct varlena *old_value;
536                 struct varlena *new_value;
537
538                 if (oldtup != NULL)
539                 {
540                         /*
541                          * For UPDATE get the old and new values of this attribute
542                          */
543                         old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
544                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
545
546                         /*
547                          * If the old value is stored on disk, check if it has changed so
548                          * we have to delete it later.
549                          */
550                         if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
551                                 VARATT_IS_EXTERNAL_ONDISK(old_value))
552                         {
553                                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
554                                         memcmp((char *) old_value, (char *) new_value,
555                                                    VARSIZE_EXTERNAL(old_value)) != 0)
556                                 {
557                                         /*
558                                          * The old external stored value isn't needed any more
559                                          * after the update
560                                          */
561                                         toast_delold[i] = true;
562                                         need_delold = true;
563                                 }
564                                 else
565                                 {
566                                         /*
567                                          * This attribute isn't changed by this update so we reuse
568                                          * the original reference to the old value in the new
569                                          * tuple.
570                                          */
571                                         toast_action[i] = 'p';
572                                         continue;
573                                 }
574                         }
575                 }
576                 else
577                 {
578                         /*
579                          * For INSERT simply get the new value
580                          */
581                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
582                 }
583
584                 /*
585                  * Handle NULL attributes
586                  */
587                 if (toast_isnull[i])
588                 {
589                         toast_action[i] = 'p';
590                         has_nulls = true;
591                         continue;
592                 }
593
594                 /*
595                  * Now look at varlena attributes
596                  */
597                 if (att[i]->attlen == -1)
598                 {
599                         /*
600                          * If the table's attribute says PLAIN always, force it so.
601                          */
602                         if (att[i]->attstorage == 'p')
603                                 toast_action[i] = 'p';
604
605                         /*
606                          * We took care of UPDATE above, so any external value we find
607                          * still in the tuple must be someone else's we cannot reuse.
608                          * Fetch it back (without decompression, unless we are forcing
609                          * PLAIN storage).  If necessary, we'll push it out as a new
610                          * external value below.
611                          */
612                         if (VARATT_IS_EXTERNAL(new_value))
613                         {
614                                 toast_oldexternal[i] = new_value;
615                                 if (att[i]->attstorage == 'p')
616                                         new_value = heap_tuple_untoast_attr(new_value);
617                                 else
618                                         new_value = heap_tuple_fetch_attr(new_value);
619                                 toast_values[i] = PointerGetDatum(new_value);
620                                 toast_free[i] = true;
621                                 need_change = true;
622                                 need_free = true;
623                         }
624
625                         /*
626                          * Remember the size of this attribute
627                          */
628                         toast_sizes[i] = VARSIZE_ANY(new_value);
629                 }
630                 else
631                 {
632                         /*
633                          * Not a varlena attribute, plain storage always
634                          */
635                         toast_action[i] = 'p';
636                 }
637         }
638
639         /* ----------
640          * Compress and/or save external until data fits into target length
641          *
642          *      1: Inline compress attributes with attstorage 'x', and store very
643          *         large attributes with attstorage 'x' or 'e' external immediately
644          *      2: Store attributes with attstorage 'x' or 'e' external
645          *      3: Inline compress attributes with attstorage 'm'
646          *      4: Store attributes with attstorage 'm' external
647          * ----------
648          */
649
650         /* compute header overhead --- this should match heap_form_tuple() */
651         hoff = offsetof(HeapTupleHeaderData, t_bits);
652         if (has_nulls)
653                 hoff += BITMAPLEN(numAttrs);
654         if (newtup->t_data->t_infomask & HEAP_HASOID)
655                 hoff += sizeof(Oid);
656         hoff = MAXALIGN(hoff);
657         /* now convert to a limit on the tuple data size */
658         maxDataLen = TOAST_TUPLE_TARGET - hoff;
659
660         /*
661          * Look for attributes with attstorage 'x' to compress.  Also find large
662          * attributes with attstorage 'x' or 'e', and store them external.
663          */
664         while (heap_compute_data_size(tupleDesc,
665                                                                   toast_values, toast_isnull) > maxDataLen)
666         {
667                 int                     biggest_attno = -1;
668                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
669                 Datum           old_value;
670                 Datum           new_value;
671
672                 /*
673                  * Search for the biggest yet unprocessed internal attribute
674                  */
675                 for (i = 0; i < numAttrs; i++)
676                 {
677                         if (toast_action[i] != ' ')
678                                 continue;
679                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
680                                 continue;               /* can't happen, toast_action would be 'p' */
681                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
682                                 continue;
683                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
684                                 continue;
685                         if (toast_sizes[i] > biggest_size)
686                         {
687                                 biggest_attno = i;
688                                 biggest_size = toast_sizes[i];
689                         }
690                 }
691
692                 if (biggest_attno < 0)
693                         break;
694
695                 /*
696                  * Attempt to compress it inline, if it has attstorage 'x'
697                  */
698                 i = biggest_attno;
699                 if (att[i]->attstorage == 'x')
700                 {
701                         old_value = toast_values[i];
702                         new_value = toast_compress_datum(old_value);
703
704                         if (DatumGetPointer(new_value) != NULL)
705                         {
706                                 /* successful compression */
707                                 if (toast_free[i])
708                                         pfree(DatumGetPointer(old_value));
709                                 toast_values[i] = new_value;
710                                 toast_free[i] = true;
711                                 toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
712                                 need_change = true;
713                                 need_free = true;
714                         }
715                         else
716                         {
717                                 /* incompressible, ignore on subsequent compression passes */
718                                 toast_action[i] = 'x';
719                         }
720                 }
721                 else
722                 {
723                         /* has attstorage 'e', ignore on subsequent compression passes */
724                         toast_action[i] = 'x';
725                 }
726
727                 /*
728                  * If this value is by itself more than maxDataLen (after compression
729                  * if any), push it out to the toast table immediately, if possible.
730                  * This avoids uselessly compressing other fields in the common case
731                  * where we have one long field and several short ones.
732                  *
733                  * XXX maybe the threshold should be less than maxDataLen?
734                  */
735                 if (toast_sizes[i] > maxDataLen &&
736                         rel->rd_rel->reltoastrelid != InvalidOid)
737                 {
738                         old_value = toast_values[i];
739                         toast_action[i] = 'p';
740                         toast_values[i] = toast_save_datum(rel, toast_values[i],
741                                                                                            toast_oldexternal[i], options);
742                         if (toast_free[i])
743                                 pfree(DatumGetPointer(old_value));
744                         toast_free[i] = true;
745                         need_change = true;
746                         need_free = true;
747                 }
748         }
749
750         /*
751          * Second we look for attributes of attstorage 'x' or 'e' that are still
752          * inline.  But skip this if there's no toast table to push them to.
753          */
754         while (heap_compute_data_size(tupleDesc,
755                                                                   toast_values, toast_isnull) > maxDataLen &&
756                    rel->rd_rel->reltoastrelid != InvalidOid)
757         {
758                 int                     biggest_attno = -1;
759                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
760                 Datum           old_value;
761
762                 /*------
763                  * Search for the biggest yet inlined attribute with
764                  * attstorage equals 'x' or 'e'
765                  *------
766                  */
767                 for (i = 0; i < numAttrs; i++)
768                 {
769                         if (toast_action[i] == 'p')
770                                 continue;
771                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
772                                 continue;               /* can't happen, toast_action would be 'p' */
773                         if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
774                                 continue;
775                         if (toast_sizes[i] > biggest_size)
776                         {
777                                 biggest_attno = i;
778                                 biggest_size = toast_sizes[i];
779                         }
780                 }
781
782                 if (biggest_attno < 0)
783                         break;
784
785                 /*
786                  * Store this external
787                  */
788                 i = biggest_attno;
789                 old_value = toast_values[i];
790                 toast_action[i] = 'p';
791                 toast_values[i] = toast_save_datum(rel, toast_values[i],
792                                                                                    toast_oldexternal[i], options);
793                 if (toast_free[i])
794                         pfree(DatumGetPointer(old_value));
795                 toast_free[i] = true;
796
797                 need_change = true;
798                 need_free = true;
799         }
800
801         /*
802          * Round 3 - this time we take attributes with storage 'm' into
803          * compression
804          */
805         while (heap_compute_data_size(tupleDesc,
806                                                                   toast_values, toast_isnull) > maxDataLen)
807         {
808                 int                     biggest_attno = -1;
809                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
810                 Datum           old_value;
811                 Datum           new_value;
812
813                 /*
814                  * Search for the biggest yet uncompressed internal attribute
815                  */
816                 for (i = 0; i < numAttrs; i++)
817                 {
818                         if (toast_action[i] != ' ')
819                                 continue;
820                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
821                                 continue;               /* can't happen, toast_action would be 'p' */
822                         if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
823                                 continue;
824                         if (att[i]->attstorage != 'm')
825                                 continue;
826                         if (toast_sizes[i] > biggest_size)
827                         {
828                                 biggest_attno = i;
829                                 biggest_size = toast_sizes[i];
830                         }
831                 }
832
833                 if (biggest_attno < 0)
834                         break;
835
836                 /*
837                  * Attempt to compress it inline
838                  */
839                 i = biggest_attno;
840                 old_value = toast_values[i];
841                 new_value = toast_compress_datum(old_value);
842
843                 if (DatumGetPointer(new_value) != NULL)
844                 {
845                         /* successful compression */
846                         if (toast_free[i])
847                                 pfree(DatumGetPointer(old_value));
848                         toast_values[i] = new_value;
849                         toast_free[i] = true;
850                         toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
851                         need_change = true;
852                         need_free = true;
853                 }
854                 else
855                 {
856                         /* incompressible, ignore on subsequent compression passes */
857                         toast_action[i] = 'x';
858                 }
859         }
860
861         /*
862          * Finally we store attributes of type 'm' externally.  At this point we
863          * increase the target tuple size, so that 'm' attributes aren't stored
864          * externally unless really necessary.
865          */
866         maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
867
868         while (heap_compute_data_size(tupleDesc,
869                                                                   toast_values, toast_isnull) > maxDataLen &&
870                    rel->rd_rel->reltoastrelid != InvalidOid)
871         {
872                 int                     biggest_attno = -1;
873                 int32           biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
874                 Datum           old_value;
875
876                 /*--------
877                  * Search for the biggest yet inlined attribute with
878                  * attstorage = 'm'
879                  *--------
880                  */
881                 for (i = 0; i < numAttrs; i++)
882                 {
883                         if (toast_action[i] == 'p')
884                                 continue;
885                         if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
886                                 continue;               /* can't happen, toast_action would be 'p' */
887                         if (att[i]->attstorage != 'm')
888                                 continue;
889                         if (toast_sizes[i] > biggest_size)
890                         {
891                                 biggest_attno = i;
892                                 biggest_size = toast_sizes[i];
893                         }
894                 }
895
896                 if (biggest_attno < 0)
897                         break;
898
899                 /*
900                  * Store this external
901                  */
902                 i = biggest_attno;
903                 old_value = toast_values[i];
904                 toast_action[i] = 'p';
905                 toast_values[i] = toast_save_datum(rel, toast_values[i],
906                                                                                    toast_oldexternal[i], options);
907                 if (toast_free[i])
908                         pfree(DatumGetPointer(old_value));
909                 toast_free[i] = true;
910
911                 need_change = true;
912                 need_free = true;
913         }
914
915         /*
916          * In the case we toasted any values, we need to build a new heap tuple
917          * with the changed values.
918          */
919         if (need_change)
920         {
921                 HeapTupleHeader olddata = newtup->t_data;
922                 HeapTupleHeader new_data;
923                 int32           new_header_len;
924                 int32           new_data_len;
925                 int32           new_tuple_len;
926
927                 /*
928                  * Calculate the new size of the tuple.
929                  *
930                  * Note: we used to assume here that the old tuple's t_hoff must equal
931                  * the new_header_len value, but that was incorrect.  The old tuple
932                  * might have a smaller-than-current natts, if there's been an ALTER
933                  * TABLE ADD COLUMN since it was stored; and that would lead to a
934                  * different conclusion about the size of the null bitmap, or even
935                  * whether there needs to be one at all.
936                  */
937                 new_header_len = offsetof(HeapTupleHeaderData, t_bits);
938                 if (has_nulls)
939                         new_header_len += BITMAPLEN(numAttrs);
940                 if (olddata->t_infomask & HEAP_HASOID)
941                         new_header_len += sizeof(Oid);
942                 new_header_len = MAXALIGN(new_header_len);
943                 new_data_len = heap_compute_data_size(tupleDesc,
944                                                                                           toast_values, toast_isnull);
945                 new_tuple_len = new_header_len + new_data_len;
946
947                 /*
948                  * Allocate and zero the space needed, and fill HeapTupleData fields.
949                  */
950                 result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
951                 result_tuple->t_len = new_tuple_len;
952                 result_tuple->t_self = newtup->t_self;
953                 result_tuple->t_tableOid = newtup->t_tableOid;
954                 new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
955                 result_tuple->t_data = new_data;
956
957                 /*
958                  * Copy the existing tuple header, but adjust natts and t_hoff.
959                  */
960                 memcpy(new_data, olddata, offsetof(HeapTupleHeaderData, t_bits));
961                 HeapTupleHeaderSetNatts(new_data, numAttrs);
962                 new_data->t_hoff = new_header_len;
963                 if (olddata->t_infomask & HEAP_HASOID)
964                         HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
965
966                 /* Copy over the data, and fill the null bitmap if needed */
967                 heap_fill_tuple(tupleDesc,
968                                                 toast_values,
969                                                 toast_isnull,
970                                                 (char *) new_data + new_header_len,
971                                                 new_data_len,
972                                                 &(new_data->t_infomask),
973                                                 has_nulls ? new_data->t_bits : NULL);
974         }
975         else
976                 result_tuple = newtup;
977
978         /*
979          * Free allocated temp values
980          */
981         if (need_free)
982                 for (i = 0; i < numAttrs; i++)
983                         if (toast_free[i])
984                                 pfree(DatumGetPointer(toast_values[i]));
985
986         /*
987          * Delete external values from the old tuple
988          */
989         if (need_delold)
990                 for (i = 0; i < numAttrs; i++)
991                         if (toast_delold[i])
992                                 toast_delete_datum(rel, toast_oldvalues[i]);
993
994         return result_tuple;
995 }
996
997
998 /* ----------
999  * toast_flatten_tuple -
1000  *
1001  *      "Flatten" a tuple to contain no out-of-line toasted fields.
1002  *      (This does not eliminate compressed or short-header datums.)
1003  *
1004  *      Note: we expect the caller already checked HeapTupleHasExternal(tup),
1005  *      so there is no need for a short-circuit path.
1006  * ----------
1007  */
1008 HeapTuple
1009 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
1010 {
1011         HeapTuple       new_tuple;
1012         Form_pg_attribute *att = tupleDesc->attrs;
1013         int                     numAttrs = tupleDesc->natts;
1014         int                     i;
1015         Datum           toast_values[MaxTupleAttributeNumber];
1016         bool            toast_isnull[MaxTupleAttributeNumber];
1017         bool            toast_free[MaxTupleAttributeNumber];
1018
1019         /*
1020          * Break down the tuple into fields.
1021          */
1022         Assert(numAttrs <= MaxTupleAttributeNumber);
1023         heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
1024
1025         memset(toast_free, 0, numAttrs * sizeof(bool));
1026
1027         for (i = 0; i < numAttrs; i++)
1028         {
1029                 /*
1030                  * Look at non-null varlena attributes
1031                  */
1032                 if (!toast_isnull[i] && att[i]->attlen == -1)
1033                 {
1034                         struct varlena *new_value;
1035
1036                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1037                         if (VARATT_IS_EXTERNAL(new_value))
1038                         {
1039                                 new_value = toast_fetch_datum(new_value);
1040                                 toast_values[i] = PointerGetDatum(new_value);
1041                                 toast_free[i] = true;
1042                         }
1043                 }
1044         }
1045
1046         /*
1047          * Form the reconfigured tuple.
1048          */
1049         new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
1050
1051         /*
1052          * Be sure to copy the tuple's OID and identity fields.  We also make a
1053          * point of copying visibility info, just in case anybody looks at those
1054          * fields in a syscache entry.
1055          */
1056         if (tupleDesc->tdhasoid)
1057                 HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
1058
1059         new_tuple->t_self = tup->t_self;
1060         new_tuple->t_tableOid = tup->t_tableOid;
1061
1062         new_tuple->t_data->t_choice = tup->t_data->t_choice;
1063         new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
1064         new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
1065         new_tuple->t_data->t_infomask |=
1066                 tup->t_data->t_infomask & HEAP_XACT_MASK;
1067         new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
1068         new_tuple->t_data->t_infomask2 |=
1069                 tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
1070
1071         /*
1072          * Free allocated temp values
1073          */
1074         for (i = 0; i < numAttrs; i++)
1075                 if (toast_free[i])
1076                         pfree(DatumGetPointer(toast_values[i]));
1077
1078         return new_tuple;
1079 }
1080
1081
1082 /* ----------
1083  * toast_flatten_tuple_to_datum -
1084  *
1085  *      "Flatten" a tuple containing out-of-line toasted fields into a Datum.
1086  *      The result is always palloc'd in the current memory context.
1087  *
1088  *      We have a general rule that Datums of container types (rows, arrays,
1089  *      ranges, etc) must not contain any external TOAST pointers.  Without
1090  *      this rule, we'd have to look inside each Datum when preparing a tuple
1091  *      for storage, which would be expensive and would fail to extend cleanly
1092  *      to new sorts of container types.
1093  *
1094  *      However, we don't want to say that tuples represented as HeapTuples
1095  *      can't contain toasted fields, so instead this routine should be called
1096  *      when such a HeapTuple is being converted into a Datum.
1097  *
1098  *      While we're at it, we decompress any compressed fields too.  This is not
1099  *      necessary for correctness, but reflects an expectation that compression
1100  *      will be more effective if applied to the whole tuple not individual
1101  *      fields.  We are not so concerned about that that we want to deconstruct
1102  *      and reconstruct tuples just to get rid of compressed fields, however.
1103  *      So callers typically won't call this unless they see that the tuple has
1104  *      at least one external field.
1105  *
1106  *      On the other hand, in-line short-header varlena fields are left alone.
1107  *      If we "untoasted" them here, they'd just get changed back to short-header
1108  *      format anyway within heap_fill_tuple.
1109  * ----------
1110  */
1111 Datum
1112 toast_flatten_tuple_to_datum(HeapTupleHeader tup,
1113                                                          uint32 tup_len,
1114                                                          TupleDesc tupleDesc)
1115 {
1116         HeapTupleHeader new_data;
1117         int32           new_header_len;
1118         int32           new_data_len;
1119         int32           new_tuple_len;
1120         HeapTupleData tmptup;
1121         Form_pg_attribute *att = tupleDesc->attrs;
1122         int                     numAttrs = tupleDesc->natts;
1123         int                     i;
1124         bool            has_nulls = false;
1125         Datum           toast_values[MaxTupleAttributeNumber];
1126         bool            toast_isnull[MaxTupleAttributeNumber];
1127         bool            toast_free[MaxTupleAttributeNumber];
1128
1129         /* Build a temporary HeapTuple control structure */
1130         tmptup.t_len = tup_len;
1131         ItemPointerSetInvalid(&(tmptup.t_self));
1132         tmptup.t_tableOid = InvalidOid;
1133         tmptup.t_data = tup;
1134
1135         /*
1136          * Break down the tuple into fields.
1137          */
1138         Assert(numAttrs <= MaxTupleAttributeNumber);
1139         heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
1140
1141         memset(toast_free, 0, numAttrs * sizeof(bool));
1142
1143         for (i = 0; i < numAttrs; i++)
1144         {
1145                 /*
1146                  * Look at non-null varlena attributes
1147                  */
1148                 if (toast_isnull[i])
1149                         has_nulls = true;
1150                 else if (att[i]->attlen == -1)
1151                 {
1152                         struct varlena *new_value;
1153
1154                         new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
1155                         if (VARATT_IS_EXTERNAL(new_value) ||
1156                                 VARATT_IS_COMPRESSED(new_value))
1157                         {
1158                                 new_value = heap_tuple_untoast_attr(new_value);
1159                                 toast_values[i] = PointerGetDatum(new_value);
1160                                 toast_free[i] = true;
1161                         }
1162                 }
1163         }
1164
1165         /*
1166          * Calculate the new size of the tuple.
1167          *
1168          * This should match the reconstruction code in toast_insert_or_update.
1169          */
1170         new_header_len = offsetof(HeapTupleHeaderData, t_bits);
1171         if (has_nulls)
1172                 new_header_len += BITMAPLEN(numAttrs);
1173         if (tup->t_infomask & HEAP_HASOID)
1174                 new_header_len += sizeof(Oid);
1175         new_header_len = MAXALIGN(new_header_len);
1176         new_data_len = heap_compute_data_size(tupleDesc,
1177                                                                                   toast_values, toast_isnull);
1178         new_tuple_len = new_header_len + new_data_len;
1179
1180         new_data = (HeapTupleHeader) palloc0(new_tuple_len);
1181
1182         /*
1183          * Copy the existing tuple header, but adjust natts and t_hoff.
1184          */
1185         memcpy(new_data, tup, offsetof(HeapTupleHeaderData, t_bits));
1186         HeapTupleHeaderSetNatts(new_data, numAttrs);
1187         new_data->t_hoff = new_header_len;
1188         if (tup->t_infomask & HEAP_HASOID)
1189                 HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(tup));
1190
1191         /* Set the composite-Datum header fields correctly */
1192         HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
1193         HeapTupleHeaderSetTypeId(new_data, tupleDesc->tdtypeid);
1194         HeapTupleHeaderSetTypMod(new_data, tupleDesc->tdtypmod);
1195
1196         /* Copy over the data, and fill the null bitmap if needed */
1197         heap_fill_tuple(tupleDesc,
1198                                         toast_values,
1199                                         toast_isnull,
1200                                         (char *) new_data + new_header_len,
1201                                         new_data_len,
1202                                         &(new_data->t_infomask),
1203                                         has_nulls ? new_data->t_bits : NULL);
1204
1205         /*
1206          * Free allocated temp values
1207          */
1208         for (i = 0; i < numAttrs; i++)
1209                 if (toast_free[i])
1210                         pfree(DatumGetPointer(toast_values[i]));
1211
1212         return PointerGetDatum(new_data);
1213 }
1214
1215
1216 /* ----------
1217  * toast_compress_datum -
1218  *
1219  *      Create a compressed version of a varlena datum
1220  *
1221  *      If we fail (ie, compressed result is actually bigger than original)
1222  *      then return NULL.  We must not use compressed data if it'd expand
1223  *      the tuple!
1224  *
1225  *      We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
1226  *      copying them.  But we can't handle external or compressed datums.
1227  * ----------
1228  */
1229 Datum
1230 toast_compress_datum(Datum value)
1231 {
1232         struct varlena *tmp;
1233         int32           valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
1234
1235         Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
1236         Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
1237
1238         /*
1239          * No point in wasting a palloc cycle if value size is out of the allowed
1240          * range for compression
1241          */
1242         if (valsize < PGLZ_strategy_default->min_input_size ||
1243                 valsize > PGLZ_strategy_default->max_input_size)
1244                 return PointerGetDatum(NULL);
1245
1246         tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
1247
1248         /*
1249          * We recheck the actual size even if pglz_compress() reports success,
1250          * because it might be satisfied with having saved as little as one byte
1251          * in the compressed data --- which could turn into a net loss once you
1252          * consider header and alignment padding.  Worst case, the compressed
1253          * format might require three padding bytes (plus header, which is
1254          * included in VARSIZE(tmp)), whereas the uncompressed format would take
1255          * only one header byte and no padding if the value is short enough.  So
1256          * we insist on a savings of more than 2 bytes to ensure we have a gain.
1257          */
1258         if (pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
1259                                           (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
1260                 VARSIZE(tmp) < valsize - 2)
1261         {
1262                 /* successful compression */
1263                 return PointerGetDatum(tmp);
1264         }
1265         else
1266         {
1267                 /* incompressible data */
1268                 pfree(tmp);
1269                 return PointerGetDatum(NULL);
1270         }
1271 }
1272
1273
1274 /* ----------
1275  * toast_get_valid_index
1276  *
1277  *      Get OID of valid index associated to given toast relation. A toast
1278  *      relation can have only one valid index at the same time.
1279  */
1280 Oid
1281 toast_get_valid_index(Oid toastoid, LOCKMODE lock)
1282 {
1283         int                     num_indexes;
1284         int                     validIndex;
1285         Oid                     validIndexOid;
1286         Relation   *toastidxs;
1287         Relation        toastrel;
1288
1289         /* Open the toast relation */
1290         toastrel = heap_open(toastoid, lock);
1291
1292         /* Look for the valid index of the toast relation */
1293         validIndex = toast_open_indexes(toastrel,
1294                                                                         lock,
1295                                                                         &toastidxs,
1296                                                                         &num_indexes);
1297         validIndexOid = RelationGetRelid(toastidxs[validIndex]);
1298
1299         /* Close the toast relation and all its indexes */
1300         toast_close_indexes(toastidxs, num_indexes, lock);
1301         heap_close(toastrel, lock);
1302
1303         return validIndexOid;
1304 }
1305
1306
1307 /* ----------
1308  * toast_save_datum -
1309  *
1310  *      Save one single datum into the secondary relation and return
1311  *      a Datum reference for it.
1312  *
1313  * rel: the main relation we're working with (not the toast rel!)
1314  * value: datum to be pushed to toast storage
1315  * oldexternal: if not NULL, toast pointer previously representing the datum
1316  * options: options to be passed to heap_insert() for toast rows
1317  * ----------
1318  */
1319 static Datum
1320 toast_save_datum(Relation rel, Datum value,
1321                                  struct varlena * oldexternal, int options)
1322 {
1323         Relation        toastrel;
1324         Relation   *toastidxs;
1325         HeapTuple       toasttup;
1326         TupleDesc       toasttupDesc;
1327         Datum           t_values[3];
1328         bool            t_isnull[3];
1329         CommandId       mycid = GetCurrentCommandId(true);
1330         struct varlena *result;
1331         struct varatt_external toast_pointer;
1332         struct
1333         {
1334                 struct varlena hdr;
1335                 char            data[TOAST_MAX_CHUNK_SIZE]; /* make struct big enough */
1336                 int32           align_it;       /* ensure struct is aligned well enough */
1337         }                       chunk_data;
1338         int32           chunk_size;
1339         int32           chunk_seq = 0;
1340         char       *data_p;
1341         int32           data_todo;
1342         Pointer         dval = DatumGetPointer(value);
1343         int                     num_indexes;
1344         int                     validIndex;
1345
1346         Assert(!VARATT_IS_EXTERNAL(value));
1347
1348         /*
1349          * Open the toast relation and its indexes.  We can use the index to check
1350          * uniqueness of the OID we assign to the toasted item, even though it has
1351          * additional columns besides OID.
1352          */
1353         toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
1354         toasttupDesc = toastrel->rd_att;
1355
1356         /* Open all the toast indexes and look for the valid one */
1357         validIndex = toast_open_indexes(toastrel,
1358                                                                         RowExclusiveLock,
1359                                                                         &toastidxs,
1360                                                                         &num_indexes);
1361
1362         /*
1363          * Get the data pointer and length, and compute va_rawsize and va_extsize.
1364          *
1365          * va_rawsize is the size of the equivalent fully uncompressed datum, so
1366          * we have to adjust for short headers.
1367          *
1368          * va_extsize is the actual size of the data payload in the toast records.
1369          */
1370         if (VARATT_IS_SHORT(dval))
1371         {
1372                 data_p = VARDATA_SHORT(dval);
1373                 data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
1374                 toast_pointer.va_rawsize = data_todo + VARHDRSZ;                /* as if not short */
1375                 toast_pointer.va_extsize = data_todo;
1376         }
1377         else if (VARATT_IS_COMPRESSED(dval))
1378         {
1379                 data_p = VARDATA(dval);
1380                 data_todo = VARSIZE(dval) - VARHDRSZ;
1381                 /* rawsize in a compressed datum is just the size of the payload */
1382                 toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
1383                 toast_pointer.va_extsize = data_todo;
1384                 /* Assert that the numbers look like it's compressed */
1385                 Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1386         }
1387         else
1388         {
1389                 data_p = VARDATA(dval);
1390                 data_todo = VARSIZE(dval) - VARHDRSZ;
1391                 toast_pointer.va_rawsize = VARSIZE(dval);
1392                 toast_pointer.va_extsize = data_todo;
1393         }
1394
1395         /*
1396          * Insert the correct table OID into the result TOAST pointer.
1397          *
1398          * Normally this is the actual OID of the target toast table, but during
1399          * table-rewriting operations such as CLUSTER, we have to insert the OID
1400          * of the table's real permanent toast table instead.  rd_toastoid is set
1401          * if we have to substitute such an OID.
1402          */
1403         if (OidIsValid(rel->rd_toastoid))
1404                 toast_pointer.va_toastrelid = rel->rd_toastoid;
1405         else
1406                 toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
1407
1408         /*
1409          * Choose an OID to use as the value ID for this toast value.
1410          *
1411          * Normally we just choose an unused OID within the toast table.  But
1412          * during table-rewriting operations where we are preserving an existing
1413          * toast table OID, we want to preserve toast value OIDs too.  So, if
1414          * rd_toastoid is set and we had a prior external value from that same
1415          * toast table, re-use its value ID.  If we didn't have a prior external
1416          * value (which is a corner case, but possible if the table's attstorage
1417          * options have been changed), we have to pick a value ID that doesn't
1418          * conflict with either new or existing toast value OIDs.
1419          */
1420         if (!OidIsValid(rel->rd_toastoid))
1421         {
1422                 /* normal case: just choose an unused OID */
1423                 toast_pointer.va_valueid =
1424                         GetNewOidWithIndex(toastrel,
1425                                                            RelationGetRelid(toastidxs[validIndex]),
1426                                                            (AttrNumber) 1);
1427         }
1428         else
1429         {
1430                 /* rewrite case: check to see if value was in old toast table */
1431                 toast_pointer.va_valueid = InvalidOid;
1432                 if (oldexternal != NULL)
1433                 {
1434                         struct varatt_external old_toast_pointer;
1435
1436                         Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
1437                         /* Must copy to access aligned fields */
1438                         VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
1439                         if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
1440                         {
1441                                 /* This value came from the old toast table; reuse its OID */
1442                                 toast_pointer.va_valueid = old_toast_pointer.va_valueid;
1443
1444                                 /*
1445                                  * There is a corner case here: the table rewrite might have
1446                                  * to copy both live and recently-dead versions of a row, and
1447                                  * those versions could easily reference the same toast value.
1448                                  * When we copy the second or later version of such a row,
1449                                  * reusing the OID will mean we select an OID that's already
1450                                  * in the new toast table.  Check for that, and if so, just
1451                                  * fall through without writing the data again.
1452                                  *
1453                                  * While annoying and ugly-looking, this is a good thing
1454                                  * because it ensures that we wind up with only one copy of
1455                                  * the toast value when there is only one copy in the old
1456                                  * toast table.  Before we detected this case, we'd have made
1457                                  * multiple copies, wasting space; and what's worse, the
1458                                  * copies belonging to already-deleted heap tuples would not
1459                                  * be reclaimed by VACUUM.
1460                                  */
1461                                 if (toastrel_valueid_exists(toastrel,
1462                                                                                         toast_pointer.va_valueid))
1463                                 {
1464                                         /* Match, so short-circuit the data storage loop below */
1465                                         data_todo = 0;
1466                                 }
1467                         }
1468                 }
1469                 if (toast_pointer.va_valueid == InvalidOid)
1470                 {
1471                         /*
1472                          * new value; must choose an OID that doesn't conflict in either
1473                          * old or new toast table
1474                          */
1475                         do
1476                         {
1477                                 toast_pointer.va_valueid =
1478                                         GetNewOidWithIndex(toastrel,
1479                                                                          RelationGetRelid(toastidxs[validIndex]),
1480                                                                            (AttrNumber) 1);
1481                         } while (toastid_valueid_exists(rel->rd_toastoid,
1482                                                                                         toast_pointer.va_valueid));
1483                 }
1484         }
1485
1486         /*
1487          * Initialize constant parts of the tuple data
1488          */
1489         t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
1490         t_values[2] = PointerGetDatum(&chunk_data);
1491         t_isnull[0] = false;
1492         t_isnull[1] = false;
1493         t_isnull[2] = false;
1494
1495         /*
1496          * Split up the item into chunks
1497          */
1498         while (data_todo > 0)
1499         {
1500                 int                     i;
1501
1502                 CHECK_FOR_INTERRUPTS();
1503
1504                 /*
1505                  * Calculate the size of this chunk
1506                  */
1507                 chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
1508
1509                 /*
1510                  * Build a tuple and store it
1511                  */
1512                 t_values[1] = Int32GetDatum(chunk_seq++);
1513                 SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
1514                 memcpy(VARDATA(&chunk_data), data_p, chunk_size);
1515                 toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
1516
1517                 heap_insert(toastrel, toasttup, mycid, options, NULL);
1518
1519                 /*
1520                  * Create the index entry.  We cheat a little here by not using
1521                  * FormIndexDatum: this relies on the knowledge that the index columns
1522                  * are the same as the initial columns of the table for all the
1523                  * indexes.
1524                  *
1525                  * Note also that there had better not be any user-created index on
1526                  * the TOAST table, since we don't bother to update anything else.
1527                  */
1528                 for (i = 0; i < num_indexes; i++)
1529                 {
1530                         /* Only index relations marked as ready can be updated */
1531                         if (IndexIsReady(toastidxs[i]->rd_index))
1532                                 index_insert(toastidxs[i], t_values, t_isnull,
1533                                                          &(toasttup->t_self),
1534                                                          toastrel,
1535                                                          toastidxs[i]->rd_index->indisunique ?
1536                                                          UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
1537                 }
1538
1539                 /*
1540                  * Free memory
1541                  */
1542                 heap_freetuple(toasttup);
1543
1544                 /*
1545                  * Move on to next chunk
1546                  */
1547                 data_todo -= chunk_size;
1548                 data_p += chunk_size;
1549         }
1550
1551         /*
1552          * Done - close toast relation and its indexes
1553          */
1554         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1555         heap_close(toastrel, RowExclusiveLock);
1556
1557         /*
1558          * Create the TOAST pointer value that we'll return
1559          */
1560         result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
1561         SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
1562         memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
1563
1564         return PointerGetDatum(result);
1565 }
1566
1567
1568 /* ----------
1569  * toast_delete_datum -
1570  *
1571  *      Delete a single external stored value.
1572  * ----------
1573  */
1574 static void
1575 toast_delete_datum(Relation rel, Datum value)
1576 {
1577         struct varlena *attr = (struct varlena *) DatumGetPointer(value);
1578         struct varatt_external toast_pointer;
1579         Relation        toastrel;
1580         Relation   *toastidxs;
1581         ScanKeyData toastkey;
1582         SysScanDesc toastscan;
1583         HeapTuple       toasttup;
1584         int                     num_indexes;
1585         int                     validIndex;
1586
1587         if (!VARATT_IS_EXTERNAL_ONDISK(attr))
1588                 return;
1589
1590         /* Must copy to access aligned fields */
1591         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1592
1593         /*
1594          * Open the toast relation and its indexes
1595          */
1596         toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
1597
1598         /* Fetch valid relation used for process */
1599         validIndex = toast_open_indexes(toastrel,
1600                                                                         RowExclusiveLock,
1601                                                                         &toastidxs,
1602                                                                         &num_indexes);
1603
1604         /*
1605          * Setup a scan key to find chunks with matching va_valueid
1606          */
1607         ScanKeyInit(&toastkey,
1608                                 (AttrNumber) 1,
1609                                 BTEqualStrategyNumber, F_OIDEQ,
1610                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1611
1612         /*
1613          * Find all the chunks.  (We don't actually care whether we see them in
1614          * sequence or not, but since we've already locked the index we might as
1615          * well use systable_beginscan_ordered.)
1616          */
1617         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1618                                                                                    SnapshotToast, 1, &toastkey);
1619         while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1620         {
1621                 /*
1622                  * Have a chunk, delete it
1623                  */
1624                 simple_heap_delete(toastrel, &toasttup->t_self);
1625         }
1626
1627         /*
1628          * End scan and close relations
1629          */
1630         systable_endscan_ordered(toastscan);
1631         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1632         heap_close(toastrel, RowExclusiveLock);
1633 }
1634
1635
1636 /* ----------
1637  * toastrel_valueid_exists -
1638  *
1639  *      Test whether a toast value with the given ID exists in the toast relation
1640  * ----------
1641  */
1642 static bool
1643 toastrel_valueid_exists(Relation toastrel, Oid valueid)
1644 {
1645         bool            result = false;
1646         ScanKeyData toastkey;
1647         SysScanDesc toastscan;
1648         int                     num_indexes;
1649         int                     validIndex;
1650         Relation   *toastidxs;
1651
1652         /* Fetch a valid index relation */
1653         validIndex = toast_open_indexes(toastrel,
1654                                                                         RowExclusiveLock,
1655                                                                         &toastidxs,
1656                                                                         &num_indexes);
1657
1658         /*
1659          * Setup a scan key to find chunks with matching va_valueid
1660          */
1661         ScanKeyInit(&toastkey,
1662                                 (AttrNumber) 1,
1663                                 BTEqualStrategyNumber, F_OIDEQ,
1664                                 ObjectIdGetDatum(valueid));
1665
1666         /*
1667          * Is there any such chunk?
1668          */
1669         toastscan = systable_beginscan(toastrel,
1670                                                                    RelationGetRelid(toastidxs[validIndex]),
1671                                                                    true, SnapshotToast, 1, &toastkey);
1672
1673         if (systable_getnext(toastscan) != NULL)
1674                 result = true;
1675
1676         systable_endscan(toastscan);
1677
1678         /* Clean up */
1679         toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
1680
1681         return result;
1682 }
1683
1684 /* ----------
1685  * toastid_valueid_exists -
1686  *
1687  *      As above, but work from toast rel's OID not an open relation
1688  * ----------
1689  */
1690 static bool
1691 toastid_valueid_exists(Oid toastrelid, Oid valueid)
1692 {
1693         bool            result;
1694         Relation        toastrel;
1695
1696         toastrel = heap_open(toastrelid, AccessShareLock);
1697
1698         result = toastrel_valueid_exists(toastrel, valueid);
1699
1700         heap_close(toastrel, AccessShareLock);
1701
1702         return result;
1703 }
1704
1705
1706 /* ----------
1707  * toast_fetch_datum -
1708  *
1709  *      Reconstruct an in memory Datum from the chunks saved
1710  *      in the toast relation
1711  * ----------
1712  */
1713 static struct varlena *
1714 toast_fetch_datum(struct varlena * attr)
1715 {
1716         Relation        toastrel;
1717         Relation   *toastidxs;
1718         ScanKeyData toastkey;
1719         SysScanDesc toastscan;
1720         HeapTuple       ttup;
1721         TupleDesc       toasttupDesc;
1722         struct varlena *result;
1723         struct varatt_external toast_pointer;
1724         int32           ressize;
1725         int32           residx,
1726                                 nextidx;
1727         int32           numchunks;
1728         Pointer         chunk;
1729         bool            isnull;
1730         char       *chunkdata;
1731         int32           chunksize;
1732         int                     num_indexes;
1733         int                     validIndex;
1734
1735         if (VARATT_IS_EXTERNAL_INDIRECT(attr))
1736                 elog(ERROR, "shouldn't be called for indirect tuples");
1737
1738         /* Must copy to access aligned fields */
1739         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1740
1741         ressize = toast_pointer.va_extsize;
1742         numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1743
1744         result = (struct varlena *) palloc(ressize + VARHDRSZ);
1745
1746         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1747                 SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
1748         else
1749                 SET_VARSIZE(result, ressize + VARHDRSZ);
1750
1751         /*
1752          * Open the toast relation and its indexes
1753          */
1754         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1755         toasttupDesc = toastrel->rd_att;
1756
1757         /* Look for the valid index of the toast relation */
1758         validIndex = toast_open_indexes(toastrel,
1759                                                                         AccessShareLock,
1760                                                                         &toastidxs,
1761                                                                         &num_indexes);
1762
1763         /*
1764          * Setup a scan key to fetch from the index by va_valueid
1765          */
1766         ScanKeyInit(&toastkey,
1767                                 (AttrNumber) 1,
1768                                 BTEqualStrategyNumber, F_OIDEQ,
1769                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1770
1771         /*
1772          * Read the chunks by index
1773          *
1774          * Note that because the index is actually on (valueid, chunkidx) we will
1775          * see the chunks in chunkidx order, even though we didn't explicitly ask
1776          * for it.
1777          */
1778         nextidx = 0;
1779
1780         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
1781                                                                                    SnapshotToast, 1, &toastkey);
1782         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
1783         {
1784                 /*
1785                  * Have a chunk, extract the sequence number and the data
1786                  */
1787                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
1788                 Assert(!isnull);
1789                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
1790                 Assert(!isnull);
1791                 if (!VARATT_IS_EXTENDED(chunk))
1792                 {
1793                         chunksize = VARSIZE(chunk) - VARHDRSZ;
1794                         chunkdata = VARDATA(chunk);
1795                 }
1796                 else if (VARATT_IS_SHORT(chunk))
1797                 {
1798                         /* could happen due to heap_form_tuple doing its thing */
1799                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
1800                         chunkdata = VARDATA_SHORT(chunk);
1801                 }
1802                 else
1803                 {
1804                         /* should never happen */
1805                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
1806                                  toast_pointer.va_valueid,
1807                                  RelationGetRelationName(toastrel));
1808                         chunksize = 0;          /* keep compiler quiet */
1809                         chunkdata = NULL;
1810                 }
1811
1812                 /*
1813                  * Some checks on the data we've found
1814                  */
1815                 if (residx != nextidx)
1816                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
1817                                  residx, nextidx,
1818                                  toast_pointer.va_valueid,
1819                                  RelationGetRelationName(toastrel));
1820                 if (residx < numchunks - 1)
1821                 {
1822                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
1823                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
1824                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
1825                                          residx, numchunks,
1826                                          toast_pointer.va_valueid,
1827                                          RelationGetRelationName(toastrel));
1828                 }
1829                 else if (residx == numchunks - 1)
1830                 {
1831                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
1832                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
1833                                          chunksize,
1834                                          (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
1835                                          residx,
1836                                          toast_pointer.va_valueid,
1837                                          RelationGetRelationName(toastrel));
1838                 }
1839                 else
1840                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
1841                                  residx,
1842                                  0, numchunks - 1,
1843                                  toast_pointer.va_valueid,
1844                                  RelationGetRelationName(toastrel));
1845
1846                 /*
1847                  * Copy the data into proper place in our result
1848                  */
1849                 memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
1850                            chunkdata,
1851                            chunksize);
1852
1853                 nextidx++;
1854         }
1855
1856         /*
1857          * Final checks that we successfully fetched the datum
1858          */
1859         if (nextidx != numchunks)
1860                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
1861                          nextidx,
1862                          toast_pointer.va_valueid,
1863                          RelationGetRelationName(toastrel));
1864
1865         /*
1866          * End scan and close relations
1867          */
1868         systable_endscan_ordered(toastscan);
1869         toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
1870         heap_close(toastrel, AccessShareLock);
1871
1872         return result;
1873 }
1874
1875 /* ----------
1876  * toast_fetch_datum_slice -
1877  *
1878  *      Reconstruct a segment of a Datum from the chunks saved
1879  *      in the toast relation
1880  * ----------
1881  */
1882 static struct varlena *
1883 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
1884 {
1885         Relation        toastrel;
1886         Relation   *toastidxs;
1887         ScanKeyData toastkey[3];
1888         int                     nscankeys;
1889         SysScanDesc toastscan;
1890         HeapTuple       ttup;
1891         TupleDesc       toasttupDesc;
1892         struct varlena *result;
1893         struct varatt_external toast_pointer;
1894         int32           attrsize;
1895         int32           residx;
1896         int32           nextidx;
1897         int                     numchunks;
1898         int                     startchunk;
1899         int                     endchunk;
1900         int32           startoffset;
1901         int32           endoffset;
1902         int                     totalchunks;
1903         Pointer         chunk;
1904         bool            isnull;
1905         char       *chunkdata;
1906         int32           chunksize;
1907         int32           chcpystrt;
1908         int32           chcpyend;
1909         int                     num_indexes;
1910         int                     validIndex;
1911
1912         Assert(VARATT_IS_EXTERNAL_ONDISK(attr));
1913
1914         /* Must copy to access aligned fields */
1915         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
1916
1917         /*
1918          * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
1919          * we can't return a compressed datum which is meaningful to toast later
1920          */
1921         Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
1922
1923         attrsize = toast_pointer.va_extsize;
1924         totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
1925
1926         if (sliceoffset >= attrsize)
1927         {
1928                 sliceoffset = 0;
1929                 length = 0;
1930         }
1931
1932         if (((sliceoffset + length) > attrsize) || length < 0)
1933                 length = attrsize - sliceoffset;
1934
1935         result = (struct varlena *) palloc(length + VARHDRSZ);
1936
1937         if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
1938                 SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
1939         else
1940                 SET_VARSIZE(result, length + VARHDRSZ);
1941
1942         if (length == 0)
1943                 return result;                  /* Can save a lot of work at this point! */
1944
1945         startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
1946         endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
1947         numchunks = (endchunk - startchunk) + 1;
1948
1949         startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
1950         endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
1951
1952         /*
1953          * Open the toast relation and its indexes
1954          */
1955         toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
1956         toasttupDesc = toastrel->rd_att;
1957
1958         /* Look for the valid index of toast relation */
1959         validIndex = toast_open_indexes(toastrel,
1960                                                                         AccessShareLock,
1961                                                                         &toastidxs,
1962                                                                         &num_indexes);
1963
1964         /*
1965          * Setup a scan key to fetch from the index. This is either two keys or
1966          * three depending on the number of chunks.
1967          */
1968         ScanKeyInit(&toastkey[0],
1969                                 (AttrNumber) 1,
1970                                 BTEqualStrategyNumber, F_OIDEQ,
1971                                 ObjectIdGetDatum(toast_pointer.va_valueid));
1972
1973         /*
1974          * Use equality condition for one chunk, a range condition otherwise:
1975          */
1976         if (numchunks == 1)
1977         {
1978                 ScanKeyInit(&toastkey[1],
1979                                         (AttrNumber) 2,
1980                                         BTEqualStrategyNumber, F_INT4EQ,
1981                                         Int32GetDatum(startchunk));
1982                 nscankeys = 2;
1983         }
1984         else
1985         {
1986                 ScanKeyInit(&toastkey[1],
1987                                         (AttrNumber) 2,
1988                                         BTGreaterEqualStrategyNumber, F_INT4GE,
1989                                         Int32GetDatum(startchunk));
1990                 ScanKeyInit(&toastkey[2],
1991                                         (AttrNumber) 2,
1992                                         BTLessEqualStrategyNumber, F_INT4LE,
1993                                         Int32GetDatum(endchunk));
1994                 nscankeys = 3;
1995         }
1996
1997         /*
1998          * Read the chunks by index
1999          *
2000          * The index is on (valueid, chunkidx) so they will come in order
2001          */
2002         nextidx = startchunk;
2003         toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
2004                                                                                  SnapshotToast, nscankeys, toastkey);
2005         while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
2006         {
2007                 /*
2008                  * Have a chunk, extract the sequence number and the data
2009                  */
2010                 residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
2011                 Assert(!isnull);
2012                 chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
2013                 Assert(!isnull);
2014                 if (!VARATT_IS_EXTENDED(chunk))
2015                 {
2016                         chunksize = VARSIZE(chunk) - VARHDRSZ;
2017                         chunkdata = VARDATA(chunk);
2018                 }
2019                 else if (VARATT_IS_SHORT(chunk))
2020                 {
2021                         /* could happen due to heap_form_tuple doing its thing */
2022                         chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2023                         chunkdata = VARDATA_SHORT(chunk);
2024                 }
2025                 else
2026                 {
2027                         /* should never happen */
2028                         elog(ERROR, "found toasted toast chunk for toast value %u in %s",
2029                                  toast_pointer.va_valueid,
2030                                  RelationGetRelationName(toastrel));
2031                         chunksize = 0;          /* keep compiler quiet */
2032                         chunkdata = NULL;
2033                 }
2034
2035                 /*
2036                  * Some checks on the data we've found
2037                  */
2038                 if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
2039                         elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
2040                                  residx, nextidx,
2041                                  toast_pointer.va_valueid,
2042                                  RelationGetRelationName(toastrel));
2043                 if (residx < totalchunks - 1)
2044                 {
2045                         if (chunksize != TOAST_MAX_CHUNK_SIZE)
2046                                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
2047                                          chunksize, (int) TOAST_MAX_CHUNK_SIZE,
2048                                          residx, totalchunks,
2049                                          toast_pointer.va_valueid,
2050                                          RelationGetRelationName(toastrel));
2051                 }
2052                 else if (residx == totalchunks - 1)
2053                 {
2054                         if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
2055                                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
2056                                          chunksize,
2057                                          (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
2058                                          residx,
2059                                          toast_pointer.va_valueid,
2060                                          RelationGetRelationName(toastrel));
2061                 }
2062                 else
2063                         elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
2064                                  residx,
2065                                  0, totalchunks - 1,
2066                                  toast_pointer.va_valueid,
2067                                  RelationGetRelationName(toastrel));
2068
2069                 /*
2070                  * Copy the data into proper place in our result
2071                  */
2072                 chcpystrt = 0;
2073                 chcpyend = chunksize - 1;
2074                 if (residx == startchunk)
2075                         chcpystrt = startoffset;
2076                 if (residx == endchunk)
2077                         chcpyend = endoffset;
2078
2079                 memcpy(VARDATA(result) +
2080                            (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
2081                            chunkdata + chcpystrt,
2082                            (chcpyend - chcpystrt) + 1);
2083
2084                 nextidx++;
2085         }
2086
2087         /*
2088          * Final checks that we successfully fetched the datum
2089          */
2090         if (nextidx != (endchunk + 1))
2091                 elog(ERROR, "missing chunk number %d for toast value %u in %s",
2092                          nextidx,
2093                          toast_pointer.va_valueid,
2094                          RelationGetRelationName(toastrel));
2095
2096         /*
2097          * End scan and close relations
2098          */
2099         systable_endscan_ordered(toastscan);
2100         toast_close_indexes(toastidxs, num_indexes, AccessShareLock);
2101         heap_close(toastrel, AccessShareLock);
2102
2103         return result;
2104 }
2105
2106 /* ----------
2107  * toast_open_indexes
2108  *
2109  *      Get an array of the indexes associated to the given toast relation
2110  *      and return as well the position of the valid index used by the toast
2111  *      relation in this array. It is the responsibility of the caller of this
2112  *      function to close the indexes as well as free them.
2113  */
2114 static int
2115 toast_open_indexes(Relation toastrel,
2116                                    LOCKMODE lock,
2117                                    Relation **toastidxs,
2118                                    int *num_indexes)
2119 {
2120         int                     i = 0;
2121         int                     res = 0;
2122         bool            found = false;
2123         List       *indexlist;
2124         ListCell   *lc;
2125
2126         /* Get index list of the toast relation */
2127         indexlist = RelationGetIndexList(toastrel);
2128         Assert(indexlist != NIL);
2129
2130         *num_indexes = list_length(indexlist);
2131
2132         /* Open all the index relations */
2133         *toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation));
2134         foreach(lc, indexlist)
2135                 (*toastidxs)[i++] = index_open(lfirst_oid(lc), lock);
2136
2137         /* Fetch the first valid index in list */
2138         for (i = 0; i < *num_indexes; i++)
2139         {
2140                 Relation        toastidx = (*toastidxs)[i];
2141
2142                 if (toastidx->rd_index->indisvalid)
2143                 {
2144                         res = i;
2145                         found = true;
2146                         break;
2147                 }
2148         }
2149
2150         /*
2151          * Free index list, not necessary anymore as relations are opened and a
2152          * valid index has been found.
2153          */
2154         list_free(indexlist);
2155
2156         /*
2157          * The toast relation should have one valid index, so something is going
2158          * wrong if there is nothing.
2159          */
2160         if (!found)
2161                 elog(ERROR, "no valid index found for toast relation with Oid %u",
2162                          RelationGetRelid(toastrel));
2163
2164         return res;
2165 }
2166
2167 /* ----------
2168  * toast_close_indexes
2169  *
2170  *      Close an array of indexes for a toast relation and free it. This should
2171  *      be called for a set of indexes opened previously with toast_open_indexes.
2172  */
2173 static void
2174 toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock)
2175 {
2176         int                     i;
2177
2178         /* Close relations and clean up things */
2179         for (i = 0; i < num_indexes; i++)
2180                 index_close(toastidxs[i], lock);
2181         pfree(toastidxs);
2182 }