Allow CREATE/ALTER DATABASE to manipulate datistemplate and datallowconn.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 2 Jul 2014 00:10:38 +0000 (20:10 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 2 Jul 2014 00:10:38 +0000 (20:10 -0400)
Historically these database properties could be manipulated only by
manually updating pg_database, which is error-prone and only possible for
superusers.  But there seems no good reason not to allow database owners to
set them for their databases, so invent CREATE/ALTER DATABASE options to do
that.  Adjust a couple of places that were doing it the hard way to use the
commands instead.

Vik Fearing, reviewed by Pavel Stehule

contrib/pg_upgrade/pg_upgrade.c
doc/src/sgml/ref/alter_database.sgml
doc/src/sgml/ref/create_database.sgml
src/backend/commands/dbcommands.c
src/bin/initdb/initdb.c
src/bin/pg_dump/pg_dumpall.c
src/bin/psql/tab-complete.c

index ea1f9f663e40db1db25d91d6a79db7b5215ea7df..b32d81efe711535699d66f7a621e5a07de313b50 100644 (file)
@@ -540,9 +540,8 @@ set_frozenxids(void)
                 */
                if (strcmp(datallowconn, "f") == 0)
                        PQclear(executeQueryOrDie(conn_template1,
-                                                                         "UPDATE pg_catalog.pg_database "
-                                                                         "SET  datallowconn = true "
-                                                                         "WHERE datname = '%s'", datname));
+                                                               "ALTER DATABASE %s ALLOW_CONNECTIONS = true",
+                                                                         quote_identifier(datname)));
 
                conn = connectToServer(&new_cluster, datname);
 
@@ -558,9 +557,8 @@ set_frozenxids(void)
                /* Reset datallowconn flag */
                if (strcmp(datallowconn, "f") == 0)
                        PQclear(executeQueryOrDie(conn_template1,
-                                                                         "UPDATE pg_catalog.pg_database "
-                                                                         "SET  datallowconn = false "
-                                                                         "WHERE datname = '%s'", datname));
+                                                          "ALTER DATABASE %s ALLOW_CONNECTIONS = false",
+                                                                         quote_identifier(datname)));
        }
 
        PQclear(dbres);
index 23ef75512f14f4e4f90018ebfd009fb86e6c94e5..3724c05e2c0cfd6c1128094d9545747f8a75970c 100644 (file)
@@ -25,6 +25,8 @@ ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> [ [ WITH ] <rep
 
 <phrase>where <replaceable class="PARAMETER">option</replaceable> can be:</phrase>
 
+    IS_TEMPLATE <replaceable class="PARAMETER">istemplate</replaceable>
+    ALLOW_CONNECTIONS <replaceable class="PARAMETER">allowconn</replaceable>
     CONNECTION LIMIT <replaceable class="PARAMETER">connlimit</replaceable>
 
 ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -107,6 +109,26 @@ ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> RESET ALL
      </varlistentry>
 
      <varlistentry>
+       <term><replaceable class="parameter">istemplate</replaceable></term>
+       <listitem>
+        <para>
+         If true, then this database can be cloned by any user with CREATEDB
+         privileges; if false, then only superusers or the owner of the
+         database can clone it.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term><replaceable class="parameter">allowconn</replaceable></term>
+       <listitem>
+        <para>
+         If false then no one can connect to this database.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
       <term><replaceable class="parameter">connlimit</replaceable></term>
       <listitem>
        <para>
index 5af898152079e9e621c3a916ed6c214e03a316d0..9711b1f98e3ebb524b918843099e2e698a112f54 100644 (file)
@@ -28,6 +28,8 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
            [ LC_COLLATE [=] <replaceable class="parameter">lc_collate</replaceable> ]
            [ LC_CTYPE [=] <replaceable class="parameter">lc_ctype</replaceable> ]
            [ TABLESPACE [=] <replaceable class="parameter">tablespace_name</replaceable> ]
+           [ IS_TEMPLATE [=] <replaceable class="parameter">istemplate</replaceable> ]
+           [ ALLOW_CONNECTIONS [=] <replaceable class="parameter">allowconn</replaceable> ]
            [ CONNECTION LIMIT [=] <replaceable class="parameter">connlimit</replaceable> ] ]
 </synopsis>
  </refsynopsisdiv>
@@ -148,6 +150,28 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
      </varlistentry>
 
      <varlistentry>
+       <term><replaceable class="parameter">istemplate</replaceable></term>
+       <listitem>
+        <para>
+         If true, then this database can be cloned by any user with CREATEDB
+         privileges; if false (the default), then only superusers or the owner
+         of the database can clone it.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term><replaceable class="parameter">allowconn</replaceable></term>
+       <listitem>
+        <para>
+         If false then no one can connect to this database.  The default is
+         true, allowing connections (except as restricted by other mechanisms,
+         such as <literal>GRANT</>/<literal>REVOKE CONNECT</>).
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
       <term><replaceable class="parameter">connlimit</replaceable></term>
       <listitem>
        <para>
index dd92aff89dc7848aeca8a06142f46d8372304630..f480be884506efe602ce3b78d8f95c4a4e17021d 100644 (file)
@@ -123,6 +123,8 @@ createdb(const CreatedbStmt *stmt)
        DefElem    *dencoding = NULL;
        DefElem    *dcollate = NULL;
        DefElem    *dctype = NULL;
+       DefElem    *distemplate = NULL;
+       DefElem    *dallowconnections = NULL;
        DefElem    *dconnlimit = NULL;
        char       *dbname = stmt->dbname;
        char       *dbowner = NULL;
@@ -131,6 +133,8 @@ createdb(const CreatedbStmt *stmt)
        char       *dbctype = NULL;
        char       *canonname;
        int                     encoding = -1;
+       bool            dbistemplate = false;
+       bool            dballowconnections = true;
        int                     dbconnlimit = -1;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -189,6 +193,22 @@ createdb(const CreatedbStmt *stmt)
                                                 errmsg("conflicting or redundant options")));
                        dctype = defel;
                }
+               else if (strcmp(defel->defname, "is_template") == 0)
+               {
+                       if (distemplate)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       distemplate = defel;
+               }
+               else if (strcmp(defel->defname, "allow_connections") == 0)
+               {
+                       if (dallowconnections)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       dallowconnections = defel;
+               }
                else if (strcmp(defel->defname, "connection_limit") == 0)
                {
                        if (dconnlimit)
@@ -244,7 +264,10 @@ createdb(const CreatedbStmt *stmt)
                dbcollate = defGetString(dcollate);
        if (dctype && dctype->arg)
                dbctype = defGetString(dctype);
-
+       if (distemplate && distemplate->arg)
+               dbistemplate = defGetBoolean(distemplate);
+       if (dallowconnections && dallowconnections->arg)
+               dballowconnections = defGetBoolean(dallowconnections);
        if (dconnlimit && dconnlimit->arg)
        {
                dbconnlimit = defGetInt32(dconnlimit);
@@ -487,8 +510,8 @@ createdb(const CreatedbStmt *stmt)
                DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
        new_record[Anum_pg_database_datctype - 1] =
                DirectFunctionCall1(namein, CStringGetDatum(dbctype));
-       new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
-       new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
+       new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
+       new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
        new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
        new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
        new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
@@ -1328,7 +1351,11 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
        ScanKeyData scankey;
        SysScanDesc scan;
        ListCell   *option;
-       int                     connlimit = -1;
+       bool            dbistemplate = false;
+       bool            dballowconnections = true;
+       int                     dbconnlimit = -1;
+       DefElem    *distemplate = NULL;
+       DefElem    *dallowconnections = NULL;
        DefElem    *dconnlimit = NULL;
        DefElem    *dtablespace = NULL;
        Datum           new_record[Natts_pg_database];
@@ -1340,7 +1367,23 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
        {
                DefElem    *defel = (DefElem *) lfirst(option);
 
-               if (strcmp(defel->defname, "connection_limit") == 0)
+               if (strcmp(defel->defname, "is_template") == 0)
+               {
+                       if (distemplate)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       distemplate = defel;
+               }
+               else if (strcmp(defel->defname, "allow_connections") == 0)
+               {
+                       if (dallowconnections)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       dallowconnections = defel;
+               }
+               else if (strcmp(defel->defname, "connection_limit") == 0)
                {
                        if (dconnlimit)
                                ereport(ERROR,
@@ -1380,13 +1423,17 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
                return InvalidOid;
        }
 
+       if (distemplate && distemplate->arg)
+               dbistemplate = defGetBoolean(distemplate);
+       if (dallowconnections && dallowconnections->arg)
+               dballowconnections = defGetBoolean(dallowconnections);
        if (dconnlimit && dconnlimit->arg)
        {
-               connlimit = defGetInt32(dconnlimit);
-               if (connlimit < -1)
+               dbconnlimit = defGetInt32(dconnlimit);
+               if (dbconnlimit < -1)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                        errmsg("invalid connection limit: %d", connlimit)));
+                                        errmsg("invalid connection limit: %d", dbconnlimit)));
        }
 
        /*
@@ -1413,6 +1460,17 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
                                           stmt->dbname);
 
+       /*
+        * In order to avoid getting locked out and having to go through
+        * standalone mode, we refuse to disallow connections to the database
+        * we're currently connected to.  Lockout can still happen with concurrent
+        * sessions but the likeliness of that is not high enough to worry about.
+        */
+       if (!dballowconnections && dboid == MyDatabaseId)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("cannot disallow connections for current database")));
+
        /*
         * Build an updated tuple, perusing the information just obtained
         */
@@ -1420,9 +1478,19 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
        MemSet(new_record_nulls, false, sizeof(new_record_nulls));
        MemSet(new_record_repl, false, sizeof(new_record_repl));
 
+       if (distemplate)
+       {
+               new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
+               new_record_repl[Anum_pg_database_datistemplate - 1] = true;
+       }
+       if (dallowconnections)
+       {
+               new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
+               new_record_repl[Anum_pg_database_datallowconn - 1] = true;
+       }
        if (dconnlimit)
        {
-               new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
+               new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
                new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
        }
 
index 5228f1342224efe830e309ec60633ec2c4b1c57f..a25965ce4c9c6338dd8e03895cb71e6e8e1f66cd 100644 (file)
@@ -2288,11 +2288,7 @@ make_template0(void)
        PG_CMD_DECL;
        const char **line;
        static const char *template0_setup[] = {
-               "CREATE DATABASE template0;\n",
-               "UPDATE pg_database SET "
-               "       datistemplate = 't', "
-               "       datallowconn = 'f' "
-               "    WHERE datname = 'template0';\n",
+               "CREATE DATABASE template0 IS_TEMPLATE = true ALLOW_CONNECTIONS = false;\n",
 
                /*
                 * We use the OID of template0 to determine lastsysoid
index 0cc4329b1a1add56042198355c6d1ab1a8f96311..9dec6f3b141fbafd18299e6ca76b487759e8e5ca 100644 (file)
@@ -1374,19 +1374,15 @@ dumpCreateDB(PGconn *conn)
                                appendPQExpBuffer(buf, " TABLESPACE = %s",
                                                                  fmtId(dbtablespace));
 
+                       if (strcmp(dbistemplate, "t") == 0)
+                               appendPQExpBuffer(buf, " IS_TEMPLATE = true");
+
                        if (strcmp(dbconnlimit, "-1") != 0)
                                appendPQExpBuffer(buf, " CONNECTION LIMIT = %s",
                                                                  dbconnlimit);
 
                        appendPQExpBufferStr(buf, ";\n");
 
-                       if (strcmp(dbistemplate, "t") == 0)
-                       {
-                               appendPQExpBufferStr(buf, "UPDATE pg_catalog.pg_database SET datistemplate = 't' WHERE datname = ");
-                               appendStringLiteralConn(buf, dbname, conn);
-                               appendPQExpBufferStr(buf, ";\n");
-                       }
-
                        if (binary_upgrade)
                        {
                                appendPQExpBufferStr(buf, "-- For binary upgrade, set datfrozenxid.\n");
index be5c3c5f4500bc97d1158a60326021b4c347b2cc..bab03572352d1e5ba9b2b78a298de95065d929c4 100644 (file)
@@ -1021,7 +1021,8 @@ psql_completion(const char *text, int start, int end)
                         pg_strcasecmp(prev2_wd, "DATABASE") == 0)
        {
                static const char *const list_ALTERDATABASE[] =
-               {"RESET", "SET", "OWNER TO", "RENAME TO", "CONNECTION LIMIT", NULL};
+               {"RESET", "SET", "OWNER TO", "RENAME TO", "IS_TEMPLATE",
+               "ALLOW_CONNECTIONS", "CONNECTION LIMIT", NULL};
 
                COMPLETE_WITH_LIST(list_ALTERDATABASE);
        }
@@ -2111,8 +2112,8 @@ psql_completion(const char *text, int start, int end)
                         pg_strcasecmp(prev2_wd, "DATABASE") == 0)
        {
                static const char *const list_DATABASE[] =
-               {"OWNER", "TEMPLATE", "ENCODING", "TABLESPACE", "CONNECTION LIMIT",
-               NULL};
+               {"OWNER", "TEMPLATE", "ENCODING", "TABLESPACE", "IS_TEMPLATE",
+               "ALLOW_CONNECTIONS", "CONNECTION LIMIT", NULL};
 
                COMPLETE_WITH_LIST(list_DATABASE);
        }