Allow for pg_upgrade of attributes with missing values
authorAndrew Dunstan <andrew@dunslane.net>
Fri, 22 Jun 2018 12:42:36 +0000 (08:42 -0400)
committerAndrew Dunstan <andrew@dunslane.net>
Fri, 22 Jun 2018 12:42:36 +0000 (08:42 -0400)
Commit 16828d5c02 neglected to do this, so upgraded databases would
silently get null instead of the specified default in rows without the
attribute defined.

A new binary upgrade function is provided to perform this and pg_dump is
adjusted to output a call to the function if required in binary upgrade
mode.

Also included is code to drop missing attribute values for dropped
columns. That way if the type is later dropped the missing value won't
have a dangling reference to the type.

Finally the regression tests are adjusted to ensure that there is a row
with a missing value so that this code is exercised in upgrade testing.

Catalog version unfortunately bumped.

Regression test changes from Tom Lane.
Remainder from me, reviewed by Tom Lane, Andres Freund, Alvaro Herrera

Discussion: https://postgr.es/m/19987.1529420110@sss.pgh.pa.us

src/backend/catalog/heap.c
src/backend/utils/adt/pg_upgrade_support.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/include/catalog/catversion.h
src/include/catalog/heap.h
src/include/catalog/pg_proc.dat
src/test/regress/expected/fast_default.out
src/test/regress/sql/fast_default.sql

index d59bd5bb00f5417473ff8706035c4f663a1ae5f6..d223ba8537b77a3a4a6faea595d2cde30ff79320 100644 (file)
@@ -1613,6 +1613,29 @@ RemoveAttributeById(Oid relid, AttrNumber attnum)
                                 "........pg.dropped.%d........", attnum);
                namestrcpy(&(attStruct->attname), newattname);
 
+               /* clear the missing value if any */
+               if (attStruct->atthasmissing)
+               {
+                       Datum           valuesAtt[Natts_pg_attribute];
+                       bool            nullsAtt[Natts_pg_attribute];
+                       bool            replacesAtt[Natts_pg_attribute];
+
+                       /* update the tuple - set atthasmissing and attmissingval */
+                       MemSet(valuesAtt, 0, sizeof(valuesAtt));
+                       MemSet(nullsAtt, false, sizeof(nullsAtt));
+                       MemSet(replacesAtt, false, sizeof(replacesAtt));
+
+                       valuesAtt[Anum_pg_attribute_atthasmissing - 1] =
+                               BoolGetDatum(false);
+                       replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
+                       valuesAtt[Anum_pg_attribute_attmissingval - 1] = (Datum) 0;
+                       nullsAtt[Anum_pg_attribute_attmissingval - 1] = true;
+                       replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
+
+                       tuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
+                                                                         valuesAtt, nullsAtt, replacesAtt);
+               }
+
                CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
        }
 
@@ -2001,6 +2024,63 @@ RelationClearMissing(Relation rel)
        heap_close(attr_rel, RowExclusiveLock);
 }
 
+/*
+ * SetAttrMissing
+ *
+ * Set the missing value of a single attribute. This should only be used by
+ * binary upgrade. Takes an AccessExclusive lock on the relation owning the
+ * attribute.
+ */
+void
+SetAttrMissing(Oid relid, char *attname, char *value)
+{
+       Datum           valuesAtt[Natts_pg_attribute];
+       bool            nullsAtt[Natts_pg_attribute];
+       bool            replacesAtt[Natts_pg_attribute];
+       Datum           missingval;
+       Form_pg_attribute attStruct;
+       Relation        attrrel,
+                               tablerel;
+       HeapTuple       atttup,
+                               newtup;
+
+       /* lock the table the attribute belongs to */
+       tablerel = heap_open(relid, AccessExclusiveLock);
+
+       /* Lock the attribute row and get the data */
+       attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
+       atttup = SearchSysCacheAttName(relid, attname);
+       if (!HeapTupleIsValid(atttup))
+               elog(ERROR, "cache lookup failed for attribute %s of relation %u",
+                        attname, relid);
+       attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
+
+       /* get an array value from the value string */
+       missingval = OidFunctionCall3(F_ARRAY_IN,
+                                                                 CStringGetDatum(value),
+                                                                 ObjectIdGetDatum(attStruct->atttypid),
+                                                                 Int32GetDatum(attStruct->atttypmod));
+
+       /* update the tuple - set atthasmissing and attmissingval */
+       MemSet(valuesAtt, 0, sizeof(valuesAtt));
+       MemSet(nullsAtt, false, sizeof(nullsAtt));
+       MemSet(replacesAtt, false, sizeof(replacesAtt));
+
+       valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true);
+       replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
+       valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
+       replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
+
+       newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
+                                                          valuesAtt, nullsAtt, replacesAtt);
+       CatalogTupleUpdate(attrrel, &newtup->t_self, newtup);
+
+       /* clean up */
+       ReleaseSysCache(atttup);
+       heap_close(attrrel, RowExclusiveLock);
+       heap_close(tablerel, AccessExclusiveLock);
+}
+
 /*
  * Store a default expression for column attnum of relation rel.
  *
index 0c54b02542f86c4602119cc09380f42060b2ef4e..b8b7777c31ba9f6c831b928270e3c9edcdba54d5 100644 (file)
@@ -12,6 +12,7 @@
 #include "postgres.h"
 
 #include "catalog/binary_upgrade.h"
+#include "catalog/heap.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
@@ -192,3 +193,18 @@ binary_upgrade_set_record_init_privs(PG_FUNCTION_ARGS)
 
        PG_RETURN_VOID();
 }
+
+Datum
+binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
+{
+       Oid                     table_id = PG_GETARG_OID(0);
+       text       *attname = PG_GETARG_TEXT_P(1);
+       text       *value = PG_GETARG_TEXT_P(2);
+       char       *cattname = text_to_cstring(attname);
+       char       *cvalue = text_to_cstring(value);
+
+       CHECK_IS_BINARY_UPGRADE;
+       SetAttrMissing(table_id, cattname, cvalue);
+
+       PG_RETURN_VOID();
+}
index ea2f022eeebdd8a3e19fb9102c4d12ccfd084eed..463639208d6723bbe22cc5f32818bfe119fabe0d 100644 (file)
@@ -8103,6 +8103,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
        int                     i_attoptions;
        int                     i_attcollation;
        int                     i_attfdwoptions;
+       int                     i_attmissingval;
        PGresult   *res;
        int                     ntups;
        bool            hasdefaults;
@@ -8132,7 +8133,34 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
 
                resetPQExpBuffer(q);
 
-               if (fout->remoteVersion >= 100000)
+               if (fout->remoteVersion >= 110000)
+               {
+                       /* atthasmissing and attmissingval are new in 11 */
+                       appendPQExpBuffer(q, "SELECT a.attnum, a.attname, a.atttypmod, "
+                                                         "a.attstattarget, a.attstorage, t.typstorage, "
+                                                         "a.attnotnull, a.atthasdef, a.attisdropped, "
+                                                         "a.attlen, a.attalign, a.attislocal, "
+                                                         "pg_catalog.format_type(t.oid,a.atttypmod) AS atttypname, "
+                                                         "array_to_string(a.attoptions, ', ') AS attoptions, "
+                                                         "CASE WHEN a.attcollation <> t.typcollation "
+                                                         "THEN a.attcollation ELSE 0 END AS attcollation, "
+                                                         "a.attidentity, "
+                                                         "pg_catalog.array_to_string(ARRAY("
+                                                         "SELECT pg_catalog.quote_ident(option_name) || "
+                                                         "' ' || pg_catalog.quote_literal(option_value) "
+                                                         "FROM pg_catalog.pg_options_to_table(attfdwoptions) "
+                                                         "ORDER BY option_name"
+                                                         "), E',\n    ') AS attfdwoptions ,"
+                                                         "CASE WHEN a.atthasmissing AND NOT a.attisdropped "
+                                                         "THEN a.attmissingval ELSE null END AS attmissingval "
+                                                         "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
+                                                         "ON a.atttypid = t.oid "
+                                                         "WHERE a.attrelid = '%u'::pg_catalog.oid "
+                                                         "AND a.attnum > 0::pg_catalog.int2 "
+                                                         "ORDER BY a.attnum",
+                                                         tbinfo->dobj.catId.oid);
+               }
+               else if (fout->remoteVersion >= 100000)
                {
                        /*
                         * attidentity is new in version 10.
@@ -8151,7 +8179,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
                                                          "' ' || pg_catalog.quote_literal(option_value) "
                                                          "FROM pg_catalog.pg_options_to_table(attfdwoptions) "
                                                          "ORDER BY option_name"
-                                                         "), E',\n    ') AS attfdwoptions "
+                                                         "), E',\n    ') AS attfdwoptions ,"
+                                                         "NULL as attmissingval "
                                                          "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
                                                          "ON a.atttypid = t.oid "
                                                          "WHERE a.attrelid = '%u'::pg_catalog.oid "
@@ -8177,7 +8206,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
                                                          "' ' || pg_catalog.quote_literal(option_value) "
                                                          "FROM pg_catalog.pg_options_to_table(attfdwoptions) "
                                                          "ORDER BY option_name"
-                                                         "), E',\n    ') AS attfdwoptions "
+                                                         "), E',\n    ') AS attfdwoptions, "
+                                                         "NULL as attmissingval "
                                                          "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
                                                          "ON a.atttypid = t.oid "
                                                          "WHERE a.attrelid = '%u'::pg_catalog.oid "
@@ -8201,7 +8231,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
                                                          "array_to_string(a.attoptions, ', ') AS attoptions, "
                                                          "CASE WHEN a.attcollation <> t.typcollation "
                                                          "THEN a.attcollation ELSE 0 END AS attcollation, "
-                                                         "NULL AS attfdwoptions "
+                                                         "NULL AS attfdwoptions, "
+                                                         "NULL as attmissingval "
                                                          "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
                                                          "ON a.atttypid = t.oid "
                                                          "WHERE a.attrelid = '%u'::pg_catalog.oid "
@@ -8219,7 +8250,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
                                                          "pg_catalog.format_type(t.oid,a.atttypmod) AS atttypname, "
                                                          "array_to_string(a.attoptions, ', ') AS attoptions, "
                                                          "0 AS attcollation, "
-                                                         "NULL AS attfdwoptions "
+                                                         "NULL AS attfdwoptions, "
+                                                         "NULL as attmissingval "
                                                          "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
                                                          "ON a.atttypid = t.oid "
                                                          "WHERE a.attrelid = '%u'::pg_catalog.oid "
@@ -8236,7 +8268,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
                                                          "a.attlen, a.attalign, a.attislocal, "
                                                          "pg_catalog.format_type(t.oid,a.atttypmod) AS atttypname, "
                                                          "'' AS attoptions, 0 AS attcollation, "
-                                                         "NULL AS attfdwoptions "
+                                                         "NULL AS attfdwoptions, "
+                                                         "NULL as attmissingval "
                                                          "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t "
                                                          "ON a.atttypid = t.oid "
                                                          "WHERE a.attrelid = '%u'::pg_catalog.oid "
@@ -8266,6 +8299,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
                i_attoptions = PQfnumber(res, "attoptions");
                i_attcollation = PQfnumber(res, "attcollation");
                i_attfdwoptions = PQfnumber(res, "attfdwoptions");
+               i_attmissingval = PQfnumber(res, "attmissingval");
 
                tbinfo->numatts = ntups;
                tbinfo->attnames = (char **) pg_malloc(ntups * sizeof(char *));
@@ -8282,6 +8316,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
                tbinfo->attoptions = (char **) pg_malloc(ntups * sizeof(char *));
                tbinfo->attcollation = (Oid *) pg_malloc(ntups * sizeof(Oid));
                tbinfo->attfdwoptions = (char **) pg_malloc(ntups * sizeof(char *));
+               tbinfo->attmissingval = (char **) pg_malloc(ntups * sizeof(char *));
                tbinfo->notnull = (bool *) pg_malloc(ntups * sizeof(bool));
                tbinfo->inhNotNull = (bool *) pg_malloc(ntups * sizeof(bool));
                tbinfo->attrdefs = (AttrDefInfo **) pg_malloc(ntups * sizeof(AttrDefInfo *));
@@ -8309,6 +8344,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables)
                        tbinfo->attoptions[j] = pg_strdup(PQgetvalue(res, j, i_attoptions));
                        tbinfo->attcollation[j] = atooid(PQgetvalue(res, j, i_attcollation));
                        tbinfo->attfdwoptions[j] = pg_strdup(PQgetvalue(res, j, i_attfdwoptions));
+                       tbinfo->attmissingval[j] = pg_strdup(PQgetvalue(res, j, i_attmissingval));
                        tbinfo->attrdefs[j] = NULL; /* fix below */
                        if (PQgetvalue(res, j, i_atthasdef)[0] == 't')
                                hasdefaults = true;
@@ -15658,6 +15694,29 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
                else
                        appendPQExpBufferStr(q, ";\n");
 
+               /*
+                * in binary upgrade mode, update the catalog with any missing values
+                * that might be present.
+                */
+               if (dopt->binary_upgrade)
+               {
+                       for (j = 0; j < tbinfo->numatts; j++)
+                       {
+                               if (tbinfo->attmissingval[j][0] != '\0')
+                               {
+                                       appendPQExpBufferStr(q, "\n-- set missing value.\n");
+                                       appendPQExpBufferStr(q,
+                                                                                "SELECT pg_catalog.binary_upgrade_set_missing_value(");
+                                       appendStringLiteralAH(q, qualrelname, fout);
+                                       appendPQExpBufferStr(q, "::pg_catalog.regclass,");
+                                       appendStringLiteralAH(q, tbinfo->attnames[j], fout);
+                                       appendPQExpBufferStr(q, ",");
+                                       appendStringLiteralAH(q, tbinfo->attmissingval[j], fout);
+                                       appendPQExpBufferStr(q, ");\n\n");
+                               }
+                       }
+               }
+
                /*
                 * To create binary-compatible heap files, we have to ensure the same
                 * physical column order, including dropped columns, as in the
index e96c662b1e9017099fe0ff43989ef90e32919860..b9043799c7fb5019490249d6a3b444c76bc25934 100644 (file)
@@ -316,6 +316,7 @@ typedef struct _tableInfo
        char      **attoptions;         /* per-attribute options */
        Oid                *attcollation;       /* per-attribute collation selection */
        char      **attfdwoptions;      /* per-attribute fdw options */
+       char      **attmissingval;  /* per attribute missing value */
        bool       *notnull;            /* NOT NULL constraints on attributes */
        bool       *inhNotNull;         /* true if NOT NULL is inherited */
        struct _attrDefInfo **attrdefs; /* DEFAULT expressions */
index 2954cba6b32134d7286a8cce9d9c53625c3b2f01..6ab4f56d08974df1705273e868010169d8d60831 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201804191
+#define CATALOG_VERSION_NO     201806221
 
 #endif
index 59fc0524947f5eb3feaeed0859b1ded917a2b7a1..c5e40ff017ddc2bce1cbc76f9973d207fc09b42b 100644 (file)
@@ -105,6 +105,7 @@ extern List *AddRelationNewConstraints(Relation rel,
                                                  bool is_internal);
 
 extern void RelationClearMissing(Relation rel);
+extern void SetAttrMissing(Oid relid, char *attname, char *value);
 
 extern Oid StoreAttrDefault(Relation rel, AttrNumber attnum,
                                 Node *expr, bool is_internal,
index 66c6c224a8be7a8e8ad78fcb816408b894adf172..9834e389295b593e7ae912158f92e44252c3380b 100644 (file)
   proname => 'binary_upgrade_set_record_init_privs', provolatile => 'v',
   proparallel => 'r', prorettype => 'void', proargtypes => 'bool',
   prosrc => 'binary_upgrade_set_record_init_privs' },
+{ oid => '4101', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_set_missing_value', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => 'oid text text',
+  prosrc => 'binary_upgrade_set_missing_value' },
 
 # replication/origin.h
 { oid => '6003', descr => 'create a replication origin',
index ef8d04f59c8e8fcc9f168cacf39e74e32b176a96..f3d783c28131f42be71e0fbd12d8da5a34260532 100644 (file)
@@ -548,3 +548,14 @@ DROP TABLE has_volatile;
 DROP EVENT TRIGGER has_volatile_rewrite;
 DROP FUNCTION log_rewrite;
 DROP SCHEMA fast_default;
+-- Leave a table with an active fast default in place, for pg_upgrade testing
+set search_path = public;
+create table has_fast_default(f1 int);
+insert into has_fast_default values(1);
+alter table has_fast_default add column f2 int default 42;
+table has_fast_default;
+ f1 | f2 
+----+----
+  1 | 42
+(1 row)
+
index 0e660330f77c8a9017961a9a2deca5cf929ef1da..7b9cc47cef5e385ca9e7292f7b414814fcd150f2 100644 (file)
@@ -369,3 +369,10 @@ DROP TABLE has_volatile;
 DROP EVENT TRIGGER has_volatile_rewrite;
 DROP FUNCTION log_rewrite;
 DROP SCHEMA fast_default;
+
+-- Leave a table with an active fast default in place, for pg_upgrade testing
+set search_path = public;
+create table has_fast_default(f1 int);
+insert into has_fast_default values(1);
+alter table has_fast_default add column f2 int default 42;
+table has_fast_default;