]> granicus.if.org Git - postgresql/commitdiff
Detoast plpgsql variables if they might live across a transaction boundary.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 16 May 2018 18:56:52 +0000 (14:56 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 16 May 2018 18:56:52 +0000 (14:56 -0400)
Up to now, it's been safe for plpgsql to store TOAST pointers in its
variables because the ActiveSnapshot for whatever query called the plpgsql
function will surely protect such TOAST values from being vacuumed away,
even if the owning table rows are committed dead.  With the introduction of
procedures, that assumption is no longer good in "non atomic" executions
of plpgsql code.  We adopt the slightly brute-force solution of detoasting
all TOAST pointers at the time they are stored into variables, if we're in
a non-atomic context, just in case the owning row goes away.

Some care is needed to avoid long-term memory leaks, since plpgsql tends
to run with CurrentMemoryContext pointing to its call-lifespan context,
but we shouldn't assume that no memory is leaked by heap_tuple_fetch_attr.
In plpgsql proper, we can do the detoasting work in the "eval_mcontext".

Most of the code thrashing here is due to the need to add this capability
to expandedrecord.c as well as plpgsql proper.  In expandedrecord.c,
we can't assume that the caller's context is short-lived, so make use of
the short-term sub-context that was already invented for checking domain
constraints.  In view of this repurposing, it seems good to rename that
variable and associated code from "domain_check_cxt" to "short_term_cxt".

Peter Eisentraut and Tom Lane

Discussion: https://postgr.es/m/5AC06865.9050005@anastigmatix.net

src/backend/utils/adt/expandedrecord.c
src/include/postgres.h
src/include/utils/expandedrecord.h
src/pl/plpgsql/src/pl_exec.c
src/test/isolation/expected/plpgsql-toast.out [new file with mode: 0644]
src/test/isolation/isolation_schedule
src/test/isolation/specs/plpgsql-toast.spec [new file with mode: 0644]

index 0bf5fe8cc7ab519ec1dc1986ec5933a91361b391..b1b6883c19f3fb72e3f54a926507ccf76062bff0 100644 (file)
@@ -19,6 +19,7 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/tuptoaster.h"
 #include "catalog/heap.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
@@ -41,7 +42,7 @@ static const ExpandedObjectMethods ER_methods =
 
 /* Other local functions */
 static void ER_mc_callback(void *arg);
-static MemoryContext get_domain_check_cxt(ExpandedRecordHeader *erh);
+static MemoryContext get_short_term_cxt(ExpandedRecordHeader *erh);
 static void build_dummy_expanded_header(ExpandedRecordHeader *main_erh);
 static pg_noinline void check_domain_for_new_field(ExpandedRecordHeader *erh,
                                                   int fnumber,
@@ -57,8 +58,9 @@ static pg_noinline void check_domain_for_new_tuple(ExpandedRecordHeader *erh,
  *
  * The expanded record is initially "empty", having a state logically
  * equivalent to a NULL composite value (not ROW(NULL, NULL, ...)).
- * Note that this might not be a valid state for a domain type; if the
- * caller needs to check that, call expanded_record_set_tuple(erh, NULL).
+ * Note that this might not be a valid state for a domain type;
+ * if the caller needs to check that, call
+ * expanded_record_set_tuple(erh, NULL, false, false).
  *
  * The expanded object will be a child of parentcontext.
  */
@@ -424,8 +426,11 @@ make_expanded_record_from_exprecord(ExpandedRecordHeader *olderh,
  *
  * The tuple is physically copied into the expanded record's local storage
  * if "copy" is true, otherwise it's caller's responsibility that the tuple
- * will live as long as the expanded record does.  In any case, out-of-line
- * fields in the tuple are not automatically inlined.
+ * will live as long as the expanded record does.
+ *
+ * Out-of-line field values in the tuple are automatically inlined if
+ * "expand_external" is true, otherwise not.  (The combination copy = false,
+ * expand_external = true is not sensible and not supported.)
  *
  * Alternatively, tuple can be NULL, in which case we just set the expanded
  * record to be empty.
@@ -433,7 +438,8 @@ make_expanded_record_from_exprecord(ExpandedRecordHeader *olderh,
 void
 expanded_record_set_tuple(ExpandedRecordHeader *erh,
                                                  HeapTuple tuple,
-                                                 bool copy)
+                                                 bool copy,
+                                                 bool expand_external)
 {
        int                     oldflags;
        HeapTuple       oldtuple;
@@ -452,6 +458,25 @@ expanded_record_set_tuple(ExpandedRecordHeader *erh,
        if (erh->flags & ER_FLAG_IS_DOMAIN)
                check_domain_for_new_tuple(erh, tuple);
 
+       /*
+        * If we need to get rid of out-of-line field values, do so, using the
+        * short-term context to avoid leaking whatever cruft the toast fetch
+        * might generate.
+        */
+       if (expand_external && tuple)
+       {
+               /* Assert caller didn't ask for unsupported case */
+               Assert(copy);
+               if (HeapTupleHasExternal(tuple))
+               {
+                       oldcxt = MemoryContextSwitchTo(get_short_term_cxt(erh));
+                       tuple = toast_flatten_tuple(tuple, erh->er_tupdesc);
+                       MemoryContextSwitchTo(oldcxt);
+               }
+               else
+                       expand_external = false;        /* need not clean up below */
+       }
+
        /*
         * Initialize new flags, keeping only non-data status bits.
         */
@@ -468,6 +493,10 @@ expanded_record_set_tuple(ExpandedRecordHeader *erh,
                newtuple = heap_copytuple(tuple);
                newflags |= ER_FLAG_FVALUE_ALLOCED;
                MemoryContextSwitchTo(oldcxt);
+
+               /* We can now flush anything that detoasting might have leaked. */
+               if (expand_external)
+                       MemoryContextReset(erh->er_short_term_cxt);
        }
        else
                newtuple = tuple;
@@ -676,23 +705,13 @@ ER_get_flat_size(ExpandedObjectHeader *eohptr)
                                VARATT_IS_EXTERNAL(DatumGetPointer(erh->dvalues[i])))
                        {
                                /*
-                                * It's an external toasted value, so we need to dereference
-                                * it so that the flat representation will be self-contained.
-                                * Do this step in the caller's context because the TOAST
-                                * fetch might leak memory.  That means making an extra copy,
-                                * which is a tad annoying, but repetitive leaks in the
-                                * record's context would be worse.
+                                * expanded_record_set_field_internal can do the actual work
+                                * of detoasting.  It needn't recheck domain constraints.
                                 */
-                               Datum           newValue;
-
-                               newValue = PointerGetDatum(PG_DETOAST_DATUM(erh->dvalues[i]));
-                               /* expanded_record_set_field can do the rest */
-                               /* ... and we don't need it to recheck domain constraints */
                                expanded_record_set_field_internal(erh, i + 1,
-                                                                                                  newValue, false,
+                                                                                                  erh->dvalues[i], false,
+                                                                                                  true,
                                                                                                   false);
-                               /* Might as well free the detoasted value */
-                               pfree(DatumGetPointer(newValue));
                        }
                }
 
@@ -1087,12 +1106,16 @@ expanded_record_fetch_field(ExpandedRecordHeader *erh, int fnumber,
  * (without changing the record's state) if the domain's constraints would
  * be violated.
  *
+ * If expand_external is true and newValue is an out-of-line value, we'll
+ * forcibly detoast it so that the record does not depend on external storage.
+ *
  * Internal callers can pass check_constraints = false to skip application
  * of domain constraints.  External callers should never do that.
  */
 void
 expanded_record_set_field_internal(ExpandedRecordHeader *erh, int fnumber,
                                                                   Datum newValue, bool isnull,
+                                                                  bool expand_external,
                                                                   bool check_constraints)
 {
        TupleDesc       tupdesc;
@@ -1124,23 +1147,46 @@ expanded_record_set_field_internal(ExpandedRecordHeader *erh, int fnumber,
                elog(ERROR, "cannot assign to field %d of expanded record", fnumber);
 
        /*
-        * Copy new field value into record's context, if needed.
+        * Copy new field value into record's context, and deal with detoasting,
+        * if needed.
         */
        attr = TupleDescAttr(tupdesc, fnumber - 1);
        if (!isnull && !attr->attbyval)
        {
                MemoryContext oldcxt;
 
+               /* If requested, detoast any external value */
+               if (expand_external)
+               {
+                       if (attr->attlen == -1 &&
+                               VARATT_IS_EXTERNAL(DatumGetPointer(newValue)))
+                       {
+                               /* Detoasting should be done in short-lived context. */
+                               oldcxt = MemoryContextSwitchTo(get_short_term_cxt(erh));
+                               newValue = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *) DatumGetPointer(newValue)));
+                               MemoryContextSwitchTo(oldcxt);
+                       }
+                       else
+                               expand_external = false;        /* need not clean up below */
+               }
+
+               /* Copy value into record's context */
                oldcxt = MemoryContextSwitchTo(erh->hdr.eoh_context);
                newValue = datumCopy(newValue, false, attr->attlen);
                MemoryContextSwitchTo(oldcxt);
 
+               /* We can now flush anything that detoasting might have leaked */
+               if (expand_external)
+                       MemoryContextReset(erh->er_short_term_cxt);
+
                /* Remember that we have field(s) that may need to be pfree'd */
                erh->flags |= ER_FLAG_DVALUES_ALLOCED;
 
                /*
                 * While we're here, note whether it's an external toasted value,
-                * because that could mean we need to inline it later.
+                * because that could mean we need to inline it later.  (Think not to
+                * merge this into the previous expand_external logic: datumCopy could
+                * by itself have made the value non-external.)
                 */
                if (attr->attlen == -1 &&
                        VARATT_IS_EXTERNAL(DatumGetPointer(newValue)))
@@ -1193,14 +1239,20 @@ expanded_record_set_field_internal(ExpandedRecordHeader *erh, int fnumber,
  * Caller must ensure that the provided datums are of the right types
  * to match the record's previously assigned rowtype.
  *
+ * If expand_external is true, we'll forcibly detoast out-of-line field values
+ * so that the record does not depend on external storage.
+ *
  * Unlike repeated application of expanded_record_set_field(), this does not
  * guarantee to leave the expanded record in a non-corrupt state in event
  * of an error.  Typically it would only be used for initializing a new
- * expanded record.
+ * expanded record.  Also, because we expect this to be applied at most once
+ * in the lifespan of an expanded record, we do not worry about any cruft
+ * that detoasting might leak.
  */
 void
 expanded_record_set_fields(ExpandedRecordHeader *erh,
-                                                  const Datum *newValues, const bool *isnulls)
+                                                  const Datum *newValues, const bool *isnulls,
+                                                  bool expand_external)
 {
        TupleDesc       tupdesc;
        Datum      *dvalues;
@@ -1245,22 +1297,37 @@ expanded_record_set_fields(ExpandedRecordHeader *erh,
                if (!attr->attbyval)
                {
                        /*
-                        * Copy new field value into record's context, if needed.
+                        * Copy new field value into record's context, and deal with
+                        * detoasting, if needed.
                         */
                        if (!isnull)
                        {
-                               newValue = datumCopy(newValue, false, attr->attlen);
+                               /* Is it an external toasted value? */
+                               if (attr->attlen == -1 &&
+                                       VARATT_IS_EXTERNAL(DatumGetPointer(newValue)))
+                               {
+                                       if (expand_external)
+                                       {
+                                               /* Detoast as requested while copying the value */
+                                               newValue = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *) DatumGetPointer(newValue)));
+                                       }
+                                       else
+                                       {
+                                               /* Just copy the value */
+                                               newValue = datumCopy(newValue, false, -1);
+                                               /* If it's still external, remember that */
+                                               if (VARATT_IS_EXTERNAL(DatumGetPointer(newValue)))
+                                                       erh->flags |= ER_FLAG_HAVE_EXTERNAL;
+                                       }
+                               }
+                               else
+                               {
+                                       /* Not an external value, just copy it */
+                                       newValue = datumCopy(newValue, false, attr->attlen);
+                               }
 
                                /* Remember that we have field(s) that need to be pfree'd */
                                erh->flags |= ER_FLAG_DVALUES_ALLOCED;
-
-                               /*
-                                * While we're here, note whether it's an external toasted
-                                * value, because that could mean we need to inline it later.
-                                */
-                               if (attr->attlen == -1 &&
-                                       VARATT_IS_EXTERNAL(DatumGetPointer(newValue)))
-                                       erh->flags |= ER_FLAG_HAVE_EXTERNAL;
                        }
 
                        /*
@@ -1291,7 +1358,7 @@ expanded_record_set_fields(ExpandedRecordHeader *erh,
        if (erh->flags & ER_FLAG_IS_DOMAIN)
        {
                /* We run domain_check in a short-lived context to limit cruft */
-               MemoryContextSwitchTo(get_domain_check_cxt(erh));
+               MemoryContextSwitchTo(get_short_term_cxt(erh));
 
                domain_check(ExpandedRecordGetRODatum(erh), false,
                                         erh->er_decltypeid,
@@ -1303,25 +1370,26 @@ expanded_record_set_fields(ExpandedRecordHeader *erh,
 }
 
 /*
- * Construct (or reset) working memory context for domain checks.
+ * Construct (or reset) working memory context for short-term operations.
+ *
+ * This context is used for domain check evaluation and for detoasting.
  *
- * If we don't have a working memory context for domain checking, make one;
- * if we have one, reset it to get rid of any leftover cruft.  (It is a tad
- * annoying to need a whole context for this, since it will often go unused
- * --- but it's hard to avoid memory leaks otherwise.  We can make the
- * context small, at least.)
+ * If we don't have a short-lived memory context, make one; if we have one,
+ * reset it to get rid of any leftover cruft.  (It is a tad annoying to need a
+ * whole context for this, since it will often go unused --- but it's hard to
+ * avoid memory leaks otherwise.  We can make the context small, at least.)
  */
 static MemoryContext
-get_domain_check_cxt(ExpandedRecordHeader *erh)
+get_short_term_cxt(ExpandedRecordHeader *erh)
 {
-       if (erh->er_domain_check_cxt == NULL)
-               erh->er_domain_check_cxt =
+       if (erh->er_short_term_cxt == NULL)
+               erh->er_short_term_cxt =
                        AllocSetContextCreate(erh->hdr.eoh_context,
-                                                                 "expanded record domain checks",
+                                                                 "expanded record short-term context",
                                                                  ALLOCSET_SMALL_SIZES);
        else
-               MemoryContextReset(erh->er_domain_check_cxt);
-       return erh->er_domain_check_cxt;
+               MemoryContextReset(erh->er_short_term_cxt);
+       return erh->er_short_term_cxt;
 }
 
 /*
@@ -1340,8 +1408,8 @@ build_dummy_expanded_header(ExpandedRecordHeader *main_erh)
        ExpandedRecordHeader *erh;
        TupleDesc       tupdesc = expanded_record_get_tupdesc(main_erh);
 
-       /* Ensure we have a domain_check_cxt */
-       (void) get_domain_check_cxt(main_erh);
+       /* Ensure we have a short-lived context */
+       (void) get_short_term_cxt(main_erh);
 
        /*
         * Allocate dummy header on first time through, or in the unlikely event
@@ -1372,7 +1440,7 @@ build_dummy_expanded_header(ExpandedRecordHeader *main_erh)
                 * nothing else is authorized to delete or transfer ownership of the
                 * object's context, so it should be safe enough.
                 */
-               EOH_init_header(&erh->hdr, &ER_methods, main_erh->er_domain_check_cxt);
+               EOH_init_header(&erh->hdr, &ER_methods, main_erh->er_short_term_cxt);
                erh->er_magic = ER_MAGIC;
 
                /* Set up dvalues/dnulls, with no valid contents as yet */
@@ -1488,7 +1556,7 @@ check_domain_for_new_field(ExpandedRecordHeader *erh, int fnumber,
         * We call domain_check in the short-lived context, so that any cruft
         * leaked by expression evaluation can be reclaimed.
         */
-       oldcxt = MemoryContextSwitchTo(erh->er_domain_check_cxt);
+       oldcxt = MemoryContextSwitchTo(erh->er_short_term_cxt);
 
        /*
         * And now we can apply the check.  Note we use main header's domain cache
@@ -1502,7 +1570,7 @@ check_domain_for_new_field(ExpandedRecordHeader *erh, int fnumber,
        MemoryContextSwitchTo(oldcxt);
 
        /* We might as well clean up cruft immediately. */
-       MemoryContextReset(erh->er_domain_check_cxt);
+       MemoryContextReset(erh->er_short_term_cxt);
 }
 
 /*
@@ -1518,7 +1586,7 @@ check_domain_for_new_tuple(ExpandedRecordHeader *erh, HeapTuple tuple)
        if (tuple == NULL)
        {
                /* We run domain_check in a short-lived context to limit cruft */
-               oldcxt = MemoryContextSwitchTo(get_domain_check_cxt(erh));
+               oldcxt = MemoryContextSwitchTo(get_short_term_cxt(erh));
 
                domain_check((Datum) 0, true,
                                         erh->er_decltypeid,
@@ -1528,7 +1596,7 @@ check_domain_for_new_tuple(ExpandedRecordHeader *erh, HeapTuple tuple)
                MemoryContextSwitchTo(oldcxt);
 
                /* We might as well clean up cruft immediately. */
-               MemoryContextReset(erh->er_domain_check_cxt);
+               MemoryContextReset(erh->er_short_term_cxt);
 
                return;
        }
@@ -1551,7 +1619,7 @@ check_domain_for_new_tuple(ExpandedRecordHeader *erh, HeapTuple tuple)
         * We call domain_check in the short-lived context, so that any cruft
         * leaked by expression evaluation can be reclaimed.
         */
-       oldcxt = MemoryContextSwitchTo(erh->er_domain_check_cxt);
+       oldcxt = MemoryContextSwitchTo(erh->er_short_term_cxt);
 
        /*
         * And now we can apply the check.  Note we use main header's domain cache
@@ -1565,5 +1633,5 @@ check_domain_for_new_tuple(ExpandedRecordHeader *erh, HeapTuple tuple)
        MemoryContextSwitchTo(oldcxt);
 
        /* We might as well clean up cruft immediately. */
-       MemoryContextReset(erh->er_domain_check_cxt);
+       MemoryContextReset(erh->er_short_term_cxt);
 }
index bbcb50e41fc0edf54cfec59d9d5394d684e6ebc1..b596fcb513ec3ae1bf429de6830700996e7a6167 100644 (file)
@@ -321,6 +321,8 @@ typedef struct
        (VARATT_IS_EXTERNAL(PTR) && VARTAG_EXTERNAL(PTR) == VARTAG_EXPANDED_RW)
 #define VARATT_IS_EXTERNAL_EXPANDED(PTR) \
        (VARATT_IS_EXTERNAL(PTR) && VARTAG_IS_EXPANDED(VARTAG_EXTERNAL(PTR)))
+#define VARATT_IS_EXTERNAL_NON_EXPANDED(PTR) \
+       (VARATT_IS_EXTERNAL(PTR) && !VARTAG_IS_EXPANDED(VARTAG_EXTERNAL(PTR)))
 #define VARATT_IS_SHORT(PTR)                           VARATT_IS_1B(PTR)
 #define VARATT_IS_EXTENDED(PTR)                                (!VARATT_IS_4B_U(PTR))
 
index a95c9cce22e7f056eace339e5c70327b4a6f7093..c999f44f38f5a1dbc8bb7e1791b46c22e4d709a3 100644 (file)
@@ -127,8 +127,10 @@ typedef struct ExpandedRecordHeader
        char       *fstartptr;          /* start of its data area */
        char       *fendptr;            /* end+1 of its data area */
 
+       /* Some operations on the expanded record need a short-lived context */
+       MemoryContext er_short_term_cxt;        /* short-term memory context */
+
        /* Working state for domain checking, used if ER_FLAG_IS_DOMAIN is set */
-       MemoryContext er_domain_check_cxt;      /* short-term memory context */
        struct ExpandedRecordHeader *er_dummy_header;   /* dummy record header */
        void       *er_domaininfo;      /* cache space for domain_check() */
 
@@ -171,7 +173,7 @@ extern ExpandedRecordHeader *make_expanded_record_from_tupdesc(TupleDesc tupdesc
 extern ExpandedRecordHeader *make_expanded_record_from_exprecord(ExpandedRecordHeader *olderh,
                                                                        MemoryContext parentcontext);
 extern void expanded_record_set_tuple(ExpandedRecordHeader *erh,
-                                                 HeapTuple tuple, bool copy);
+                                                 HeapTuple tuple, bool copy, bool expand_external);
 extern Datum make_expanded_record_from_datum(Datum recorddatum,
                                                                MemoryContext parentcontext);
 extern TupleDesc expanded_record_fetch_tupdesc(ExpandedRecordHeader *erh);
@@ -186,13 +188,15 @@ extern Datum expanded_record_fetch_field(ExpandedRecordHeader *erh, int fnumber,
 extern void expanded_record_set_field_internal(ExpandedRecordHeader *erh,
                                                                   int fnumber,
                                                                   Datum newValue, bool isnull,
+                                                                  bool expand_external,
                                                                   bool check_constraints);
 extern void expanded_record_set_fields(ExpandedRecordHeader *erh,
-                                                  const Datum *newValues, const bool *isnulls);
+                                                  const Datum *newValues, const bool *isnulls,
+                                                  bool expand_external);
 
 /* outside code should never call expanded_record_set_field_internal as such */
-#define expanded_record_set_field(erh, fnumber, newValue, isnull) \
-       expanded_record_set_field_internal(erh, fnumber, newValue, isnull, true)
+#define expanded_record_set_field(erh, fnumber, newValue, isnull, expand_external) \
+       expanded_record_set_field_internal(erh, fnumber, newValue, isnull, expand_external, true)
 
 /*
  * Inline-able fast cases.  The expanded_record_fetch_xxx functions above
index 228d1c0d00cada49f0aedfc63822546d72878b8c..1eb421bcc7066e393fc2c9b0c6ef3ddd6ddad174 100644 (file)
@@ -20,6 +20,7 @@
 #include "access/htup_details.h"
 #include "access/transam.h"
 #include "access/tupconvert.h"
+#include "access/tuptoaster.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
 #include "commands/defrem.h"
@@ -912,16 +913,20 @@ plpgsql_exec_trigger(PLpgSQL_function *func,
        }
        else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
        {
-               expanded_record_set_tuple(rec_new->erh, trigdata->tg_trigtuple, false);
+               expanded_record_set_tuple(rec_new->erh, trigdata->tg_trigtuple,
+                                                                 false, false);
        }
        else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
        {
-               expanded_record_set_tuple(rec_new->erh, trigdata->tg_newtuple, false);
-               expanded_record_set_tuple(rec_old->erh, trigdata->tg_trigtuple, false);
+               expanded_record_set_tuple(rec_new->erh, trigdata->tg_newtuple,
+                                                                 false, false);
+               expanded_record_set_tuple(rec_old->erh, trigdata->tg_trigtuple,
+                                                                 false, false);
        }
        else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
        {
-               expanded_record_set_tuple(rec_old->erh, trigdata->tg_trigtuple, false);
+               expanded_record_set_tuple(rec_old->erh, trigdata->tg_trigtuple,
+                                                                 false, false);
        }
        else
                elog(ERROR, "unrecognized trigger action: not INSERT, DELETE, or UPDATE");
@@ -5061,7 +5066,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
 
                                /* And assign it. */
                                expanded_record_set_field(erh, recfield->finfo.fnumber,
-                                                                                 value, isNull);
+                                                                                 value, isNull, !estate->atomic);
                                break;
                        }
 
@@ -5875,7 +5880,8 @@ exec_for_query(PLpgSQL_execstate *estate, PLpgSQL_stmt_forq *stmt,
                                        tupdescs_match)
                                {
                                        /* Only need to assign a new tuple value */
-                                       expanded_record_set_tuple(rec->erh, tuptab->vals[i], true);
+                                       expanded_record_set_tuple(rec->erh, tuptab->vals[i],
+                                                                                         true, !estate->atomic);
                                }
                                else
                                {
@@ -6647,7 +6653,7 @@ exec_move_row(PLpgSQL_execstate *estate,
                                 */
                                newerh = make_expanded_record_for_rec(estate, rec,
                                                                                                          NULL, rec->erh);
-                               expanded_record_set_tuple(newerh, NULL, false);
+                               expanded_record_set_tuple(newerh, NULL, false, false);
                                assign_record_var(estate, rec, newerh);
                        }
                        else
@@ -6689,7 +6695,7 @@ exec_move_row(PLpgSQL_execstate *estate,
                        else
                        {
                                /* No coercion is needed, so just assign the row value */
-                               expanded_record_set_tuple(newerh, tup, true);
+                               expanded_record_set_tuple(newerh, tup, true, !estate->atomic);
                        }
 
                        /* Complete the assignment */
@@ -6927,7 +6933,7 @@ exec_move_row_from_fields(PLpgSQL_execstate *estate,
                }
 
                /* Insert the coerced field values into the new expanded record */
-               expanded_record_set_fields(newerh, values, nulls);
+               expanded_record_set_fields(newerh, values, nulls, !estate->atomic);
 
                /* Complete the assignment */
                assign_record_var(estate, rec, newerh);
@@ -7194,7 +7200,8 @@ exec_move_row_from_datum(PLpgSQL_execstate *estate,
                                 (erh->er_typmod == rec->erh->er_typmod &&
                                  erh->er_typmod >= 0)))
                        {
-                               expanded_record_set_tuple(rec->erh, erh->fvalue, true);
+                               expanded_record_set_tuple(rec->erh, erh->fvalue,
+                                                                                 true, !estate->atomic);
                                return;
                        }
 
@@ -7216,7 +7223,8 @@ exec_move_row_from_datum(PLpgSQL_execstate *estate,
                                (rec->rectypeid == RECORDOID ||
                                 rec->rectypeid == erh->er_typeid))
                        {
-                               expanded_record_set_tuple(newerh, erh->fvalue, true);
+                               expanded_record_set_tuple(newerh, erh->fvalue,
+                                                                                 true, !estate->atomic);
                                assign_record_var(estate, rec, newerh);
                                return;
                        }
@@ -7306,7 +7314,8 @@ exec_move_row_from_datum(PLpgSQL_execstate *estate,
                                 (tupTypmod == rec->erh->er_typmod &&
                                  tupTypmod >= 0)))
                        {
-                               expanded_record_set_tuple(rec->erh, &tmptup, true);
+                               expanded_record_set_tuple(rec->erh, &tmptup,
+                                                                                 true, !estate->atomic);
                                return;
                        }
 
@@ -7323,7 +7332,8 @@ exec_move_row_from_datum(PLpgSQL_execstate *estate,
 
                                newerh = make_expanded_record_from_typeid(tupType, tupTypmod,
                                                                                                                  mcontext);
-                               expanded_record_set_tuple(newerh, &tmptup, true);
+                               expanded_record_set_tuple(newerh, &tmptup,
+                                                                                 true, !estate->atomic);
                                assign_record_var(estate, rec, newerh);
                                return;
                        }
@@ -8051,7 +8061,8 @@ plpgsql_subxact_cb(SubXactEvent event, SubTransactionId mySubid,
  * assign_simple_var --- assign a new value to any VAR datum.
  *
  * This should be the only mechanism for assignment to simple variables,
- * lest we do the release of the old value incorrectly.
+ * lest we do the release of the old value incorrectly (not to mention
+ * the detoasting business).
  */
 static void
 assign_simple_var(PLpgSQL_execstate *estate, PLpgSQL_var *var,
@@ -8059,6 +8070,41 @@ assign_simple_var(PLpgSQL_execstate *estate, PLpgSQL_var *var,
 {
        Assert(var->dtype == PLPGSQL_DTYPE_VAR ||
                   var->dtype == PLPGSQL_DTYPE_PROMISE);
+
+       /*
+        * In non-atomic contexts, we do not want to store TOAST pointers in
+        * variables, because such pointers might become stale after a commit.
+        * Forcibly detoast in such cases.  We don't want to detoast (flatten)
+        * expanded objects, however; those should be OK across a transaction
+        * boundary since they're just memory-resident objects.  (Elsewhere in
+        * this module, operations on expanded records likewise need to request
+        * detoasting of record fields when !estate->atomic.  Expanded arrays are
+        * not a problem since all array entries are always detoasted.)
+        */
+       if (!estate->atomic && !isnull && var->datatype->typlen == -1 &&
+               VARATT_IS_EXTERNAL_NON_EXPANDED(DatumGetPointer(newvalue)))
+       {
+               MemoryContext oldcxt;
+               Datum           detoasted;
+
+               /*
+                * Do the detoasting in the eval_mcontext to avoid long-term leakage
+                * of whatever memory toast fetching might leak.  Then we have to copy
+                * the detoasted datum to the function's main context, which is a
+                * pain, but there's little choice.
+                */
+               oldcxt = MemoryContextSwitchTo(get_eval_mcontext(estate));
+               detoasted = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *) DatumGetPointer(newvalue)));
+               MemoryContextSwitchTo(oldcxt);
+               /* Now's a good time to not leak the input value if it's freeable */
+               if (freeable)
+                       pfree(DatumGetPointer(newvalue));
+               /* Once we copy the value, it's definitely freeable */
+               newvalue = datumCopy(detoasted, false, -1);
+               freeable = true;
+               /* Can't clean up eval_mcontext here, but it'll happen before long */
+       }
+
        /* Free the old value if needed */
        if (var->freeval)
        {
diff --git a/src/test/isolation/expected/plpgsql-toast.out b/src/test/isolation/expected/plpgsql-toast.out
new file mode 100644 (file)
index 0000000..4341153
--- /dev/null
@@ -0,0 +1,189 @@
+Parsed test spec with 2 sessions
+
+starting permutation: lock assign1 vacuum unlock
+pg_advisory_unlock_all
+
+               
+pg_advisory_unlock_all
+
+               
+step lock: 
+    SELECT pg_advisory_lock(1);
+
+pg_advisory_lock
+
+               
+step assign1: 
+do $$
+  declare
+    x text;
+  begin
+    select test1.b into x from test1;
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'x = %', x;
+  end;
+$$;
+ <waiting ...>
+step vacuum: 
+    VACUUM test1;
+
+step unlock: 
+    SELECT pg_advisory_unlock(1);
+
+pg_advisory_unlock
+
+t              
+step assign1: <... completed>
+
+starting permutation: lock assign2 vacuum unlock
+pg_advisory_unlock_all
+
+               
+pg_advisory_unlock_all
+
+               
+step lock: 
+    SELECT pg_advisory_lock(1);
+
+pg_advisory_lock
+
+               
+step assign2: 
+do $$
+  declare
+    x text;
+  begin
+    x := (select test1.b from test1);
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'x = %', x;
+  end;
+$$;
+ <waiting ...>
+step vacuum: 
+    VACUUM test1;
+
+step unlock: 
+    SELECT pg_advisory_unlock(1);
+
+pg_advisory_unlock
+
+t              
+step assign2: <... completed>
+
+starting permutation: lock assign3 vacuum unlock
+pg_advisory_unlock_all
+
+               
+pg_advisory_unlock_all
+
+               
+step lock: 
+    SELECT pg_advisory_lock(1);
+
+pg_advisory_lock
+
+               
+step assign3: 
+do $$
+  declare
+    r record;
+  begin
+    select * into r from test1;
+    r.b := (select test1.b from test1);
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'r = %', r;
+  end;
+$$;
+ <waiting ...>
+step vacuum: 
+    VACUUM test1;
+
+step unlock: 
+    SELECT pg_advisory_unlock(1);
+
+pg_advisory_unlock
+
+t              
+step assign3: <... completed>
+
+starting permutation: lock assign4 vacuum unlock
+pg_advisory_unlock_all
+
+               
+pg_advisory_unlock_all
+
+               
+step lock: 
+    SELECT pg_advisory_lock(1);
+
+pg_advisory_lock
+
+               
+step assign4: 
+do $$
+  declare
+    r test2;
+  begin
+    select * into r from test1;
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'r = %', r;
+  end;
+$$;
+ <waiting ...>
+step vacuum: 
+    VACUUM test1;
+
+step unlock: 
+    SELECT pg_advisory_unlock(1);
+
+pg_advisory_unlock
+
+t              
+step assign4: <... completed>
+
+starting permutation: lock assign5 vacuum unlock
+pg_advisory_unlock_all
+
+               
+pg_advisory_unlock_all
+
+               
+step lock: 
+    SELECT pg_advisory_lock(1);
+
+pg_advisory_lock
+
+               
+step assign5: 
+do $$
+  declare
+    r record;
+  begin
+    for r in select test1.b from test1 loop
+      null;
+    end loop;
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'r = %', r;
+  end;
+$$;
+ <waiting ...>
+step vacuum: 
+    VACUUM test1;
+
+step unlock: 
+    SELECT pg_advisory_unlock(1);
+
+pg_advisory_unlock
+
+t              
+step assign5: <... completed>
index b650e467a6372d8341c3d8c5e038bd3aaf03f91e..0e997215a8046ec052d6db688c116f34e5b66e4c 100644 (file)
@@ -74,3 +74,4 @@ test: predicate-gin-nomatch
 test: partition-key-update-1
 test: partition-key-update-2
 test: partition-key-update-3
+test: plpgsql-toast
diff --git a/src/test/isolation/specs/plpgsql-toast.spec b/src/test/isolation/specs/plpgsql-toast.spec
new file mode 100644 (file)
index 0000000..e6228c9
--- /dev/null
@@ -0,0 +1,137 @@
+# Test TOAST behavior in PL/pgSQL procedures with transaction control.
+#
+# We need to ensure that values stored in PL/pgSQL variables are free
+# of external TOAST references, because those could disappear after a
+# transaction is committed (leading to errors "missing chunk number
+# ... for toast value ...").  The tests here do this by running VACUUM
+# in a second session.  Advisory locks are used to have the VACUUM
+# kick in at the right time.  The different "assign" steps test
+# different code paths for variable assignments in PL/pgSQL.
+
+setup
+{
+    CREATE TABLE test1 (a int, b text);
+    ALTER TABLE test1 ALTER COLUMN b SET STORAGE EXTERNAL;
+    INSERT INTO test1 VALUES (1, repeat('foo', 2000));
+    CREATE TYPE test2 AS (a bigint, b text);
+}
+
+teardown
+{
+    DROP TABLE test1;
+    DROP TYPE test2;
+}
+
+session "s1"
+
+setup
+{
+    SELECT pg_advisory_unlock_all();
+}
+
+# assign_simple_var()
+step "assign1"
+{
+do $$
+  declare
+    x text;
+  begin
+    select test1.b into x from test1;
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'x = %', x;
+  end;
+$$;
+}
+
+# assign_simple_var()
+step "assign2"
+{
+do $$
+  declare
+    x text;
+  begin
+    x := (select test1.b from test1);
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'x = %', x;
+  end;
+$$;
+}
+
+# expanded_record_set_field()
+step "assign3"
+{
+do $$
+  declare
+    r record;
+  begin
+    select * into r from test1;
+    r.b := (select test1.b from test1);
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'r = %', r;
+  end;
+$$;
+}
+
+# expanded_record_set_fields()
+step "assign4"
+{
+do $$
+  declare
+    r test2;
+  begin
+    select * into r from test1;
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'r = %', r;
+  end;
+$$;
+}
+
+# expanded_record_set_tuple()
+step "assign5"
+{
+do $$
+  declare
+    r record;
+  begin
+    for r in select test1.b from test1 loop
+      null;
+    end loop;
+    delete from test1;
+    commit;
+    perform pg_advisory_lock(1);
+    raise notice 'r = %', r;
+  end;
+$$;
+}
+
+session "s2"
+setup
+{
+    SELECT pg_advisory_unlock_all();
+}
+step "lock"
+{
+    SELECT pg_advisory_lock(1);
+}
+step "vacuum"
+{
+    VACUUM test1;
+}
+step "unlock"
+{
+    SELECT pg_advisory_unlock(1);
+}
+
+permutation "lock" "assign1" "vacuum" "unlock"
+permutation "lock" "assign2" "vacuum" "unlock"
+permutation "lock" "assign3" "vacuum" "unlock"
+permutation "lock" "assign4" "vacuum" "unlock"
+permutation "lock" "assign5" "vacuum" "unlock"