]> granicus.if.org Git - postgresql/blobdiff - src/bin/pg_upgrade/version.c
Check for tables with sql_identifier during pg_upgrade
[postgresql] / src / bin / pg_upgrade / version.c
index 10cb362e0929dd7e7a4fa47b25e9bb3de838bf42..60a1025c3ec292c3e1bc342e82477adfeeebe60d 100644 (file)
@@ -399,3 +399,122 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode)
        else
                check_ok();
 }
+
+/*
+ * old_11_check_for_sql_identifier_data_type_usage()
+ *     11 -> 12
+ *     In 12, the sql_identifier data type was switched from name to varchar,
+ *     which does affect the storage (name is by-ref, but not varlena). This
+ *     means user tables using sql_identifier for columns are broken because
+ *     the on-disk format is different.
+ *
+ *     We need to check all objects that might store sql_identifier on disk,
+ *     i.e. tables, matviews and indexes. Also check composite types in case
+ *     they are used in this context.
+ */
+void
+old_11_check_for_sql_identifier_data_type_usage(ClusterInfo *cluster)
+{
+       int                     dbnum;
+       FILE       *script = NULL;
+       bool            found = false;
+       char            output_path[MAXPGPATH];
+
+       prep_status("Checking for invalid \"sql_identifier\" user columns");
+
+       snprintf(output_path, sizeof(output_path), "tables_using_sql_identifier.txt");
+
+       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       {
+               PGresult   *res;
+               bool            db_used = false;
+               int                     ntups;
+               int                     rowno;
+               int                     i_nspname,
+                                       i_relname,
+                                       i_attname;
+               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
+               PGconn     *conn = connectToServer(cluster, active_db->db_name);
+
+               /*
+                * We need the recursive CTE because the sql_identifier may be wrapped
+                * either in a domain or composite type, or both (in arbitrary order).
+                */
+               res = executeQueryOrDie(conn,
+                                                               "WITH RECURSIVE oids AS ( "
+               /* the sql_identifier type itself */
+                                                               "       SELECT 'information_schema.sql_identifier'::regtype AS oid "
+                                                               "       UNION ALL "
+                                                               "       SELECT * FROM ( "
+               /* domains on the type */
+                                                               "               WITH x AS (SELECT oid FROM oids) "
+                                                               "                       SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+                                                               "                       UNION "
+               /* composite types containing the type */
+                                                               "                       SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+                                                               "                       WHERE t.typtype = 'c' AND "
+                                                               "                                 t.oid = c.reltype AND "
+                                                               "                                 c.oid = a.attrelid AND "
+                                                               "                                 NOT a.attisdropped AND "
+                                                               "                                 a.atttypid = x.oid "
+                                                               "       ) foo "
+                                                               ") "
+                                                               "SELECT n.nspname, c.relname, a.attname "
+                                                               "FROM   pg_catalog.pg_class c, "
+                                                               "               pg_catalog.pg_namespace n, "
+                                                               "               pg_catalog.pg_attribute a "
+                                                               "WHERE  c.oid = a.attrelid AND "
+                                                               "               NOT a.attisdropped AND "
+                                                               "               a.atttypid IN (SELECT oid FROM oids) AND "
+                                                               "               c.relkind IN ("
+                                                               CppAsString2(RELKIND_RELATION) ", "
+                                                               CppAsString2(RELKIND_MATVIEW) ", "
+                                                               CppAsString2(RELKIND_INDEX) ") AND "
+                                                               "               c.relnamespace = n.oid AND "
+               /* exclude possible orphaned temp tables */
+                                                               "               n.nspname !~ '^pg_temp_' AND "
+                                                               "               n.nspname !~ '^pg_toast_temp_' AND "
+                                                               "               n.nspname NOT IN ('pg_catalog', 'information_schema')");
+
+               ntups = PQntuples(res);
+               i_nspname = PQfnumber(res, "nspname");
+               i_relname = PQfnumber(res, "relname");
+               i_attname = PQfnumber(res, "attname");
+               for (rowno = 0; rowno < ntups; rowno++)
+               {
+                       found = true;
+                       if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
+                               pg_fatal("could not open file \"%s\": %s\n", output_path,
+                                                strerror(errno));
+                       if (!db_used)
+                       {
+                               fprintf(script, "Database: %s\n", active_db->db_name);
+                               db_used = true;
+                       }
+                       fprintf(script, "  %s.%s.%s\n",
+                                       PQgetvalue(res, rowno, i_nspname),
+                                       PQgetvalue(res, rowno, i_relname),
+                                       PQgetvalue(res, rowno, i_attname));
+               }
+
+               PQclear(res);
+
+               PQfinish(conn);
+       }
+
+       if (script)
+               fclose(script);
+
+       if (found)
+       {
+               pg_log(PG_REPORT, "fatal\n");
+               pg_fatal("Your installation contains the \"sql_identifier\" data type in user tables\n"
+                                "and/or indexes.  The on-disk format for this data type has changed, so this\n"
+                                "cluster cannot currently be upgraded.  You can remove the problem tables or\n"
+                                "change the data type to \"name\" and restart the upgrade.\n"
+                                "A list of the problem columns is in the file:\n"
+                                "    %s\n\n", output_path);
+       }
+       else
+               check_ok();
+}