]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/dbcommands.c
This patch implement the TODO [ALTER DATABASE foo OWNER TO bar].
[postgresql] / src / backend / commands / dbcommands.c
index f5bacf6b3c7289664bc0910fa168a528c553e3d7..02c1bf8e2042a9441486593ba0229765eab049fc 100644 (file)
@@ -4,12 +4,12 @@
  *             Database management commands (create/drop database).
  *
  *
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.104 2002/09/03 22:17:34 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.134 2004/05/26 13:56:45 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -20,6 +20,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include "access/genam.h"
 #include "access/heapam.h"
 #include "catalog/catname.h"
 #include "catalog/catalog.h"
 #include "commands/comment.h"
 #include "commands/dbcommands.h"
 #include "miscadmin.h"
+#include "storage/fd.h"
 #include "storage/freespace.h"
 #include "storage/sinval.h"
+#include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -61,9 +64,11 @@ createdb(const CreatedbStmt *stmt)
        char       *alt_loc;
        char       *target_dir;
        char            src_loc[MAXPGPATH];
+#ifndef WIN32
        char            buf[2 * MAXPGPATH + 100];
+#endif
        Oid                     src_dboid;
-       int4            src_owner;
+       AclId           src_owner;
        int                     src_encoding;
        bool            src_istemplate;
        Oid                     src_lastsysoid;
@@ -76,17 +81,17 @@ createdb(const CreatedbStmt *stmt)
        Datum           new_record[Natts_pg_database];
        char            new_record_nulls[Natts_pg_database];
        Oid                     dboid;
-       int32           datdba;
-       List       *option;
+       AclId           datdba;
+       ListCell   *option;
        DefElem    *downer = NULL;
-       DefElem    *dpath = NULL;
-       DefElem    *dtemplate = NULL;
-       DefElem    *dencoding = NULL;
+       DefElem    *dpath = NULL;
+       DefElem    *dtemplate = NULL;
+       DefElem    *dencoding = NULL;
        char       *dbname = stmt->dbname;
        char       *dbowner = NULL;
        char       *dbpath = NULL;
        char       *dbtemplate = NULL;
-       int                 encoding = -1;
+       int                     encoding = -1;
 
        /* Extract options from the statement node tree */
        foreach(option, stmt->options)
@@ -96,64 +101,110 @@ createdb(const CreatedbStmt *stmt)
                if (strcmp(defel->defname, "owner") == 0)
                {
                        if (downer)
-                               elog(ERROR, "CREATE DATABASE: conflicting options");
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
                        downer = defel;
                }
                else if (strcmp(defel->defname, "location") == 0)
                {
                        if (dpath)
-                               elog(ERROR, "CREATE DATABASE: conflicting options");
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
                        dpath = defel;
                }
                else if (strcmp(defel->defname, "template") == 0)
                {
                        if (dtemplate)
-                               elog(ERROR, "CREATE DATABASE: conflicting options");
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
                        dtemplate = defel;
                }
                else if (strcmp(defel->defname, "encoding") == 0)
                {
                        if (dencoding)
-                               elog(ERROR, "CREATE DATABASE: conflicting options");
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
                        dencoding = defel;
                }
                else
-                       elog(ERROR, "CREATE DATABASE: option \"%s\" not recognized",
+                       elog(ERROR, "option \"%s\" not recognized",
                                 defel->defname);
        }
 
-       if (downer)
+       if (downer && downer->arg)
                dbowner = strVal(downer->arg);
-       if (dpath)
+       if (dpath && dpath->arg)
                dbpath = strVal(dpath->arg);
-       if (dtemplate)
+       if (dtemplate && dtemplate->arg)
                dbtemplate = strVal(dtemplate->arg);
-       if (dencoding)
-               encoding = intVal(dencoding->arg);
+       if (dencoding && dencoding->arg)
+       {
+               const char *encoding_name;
+
+               if (IsA(dencoding->arg, Integer))
+               {
+                       encoding = intVal(dencoding->arg);
+                       encoding_name = pg_encoding_to_char(encoding);
+                       if (strcmp(encoding_name, "") == 0 ||
+                               pg_valid_server_encoding(encoding_name) < 0)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                                errmsg("%d is not a valid encoding code",
+                                                               encoding)));
+               }
+               else if (IsA(dencoding->arg, String))
+               {
+                       encoding_name = strVal(dencoding->arg);
+                       if (pg_valid_server_encoding(encoding_name) < 0)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                                errmsg("%s is not a valid encoding name",
+                                                               encoding_name)));
+                       encoding = pg_char_to_encoding(encoding_name);
+               }
+               else
+                       elog(ERROR, "unrecognized node type: %d",
+                                nodeTag(dencoding->arg));
+       }
 
        /* obtain sysid of proposed owner */
        if (dbowner)
-               datdba = get_usesysid(dbowner); /* will elog if no such user */
+               datdba = get_usesysid(dbowner); /* will ereport if no such user */
        else
                datdba = GetUserId();
 
-       if (datdba == (int32) GetUserId())
+       if (datdba == GetUserId())
        {
                /* creating database for self: can be superuser or createdb */
                if (!superuser() && !have_createdb_privilege())
-                       elog(ERROR, "CREATE DATABASE: permission denied");
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("permission denied to create database")));
        }
        else
        {
                /* creating database for someone else: must be superuser */
                /* note that the someone else need not have any permissions */
                if (!superuser())
-                       elog(ERROR, "CREATE DATABASE: permission denied");
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("must be superuser to create database for another user")));
        }
 
        /* don't call this in a transaction block */
-       if (IsTransactionBlock())
-               elog(ERROR, "CREATE DATABASE: may not be called in a transaction block");
+       PreventTransactionChain((void *) stmt, "CREATE DATABASE");
+
+       /* alternate location requires symlinks */
+#ifndef HAVE_SYMLINK
+       if (dbpath != NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                  errmsg("cannot use an alternative location on this platform")));
+#endif
 
        /*
         * Check for db name conflict.  There is a race condition here, since
@@ -164,7 +215,9 @@ createdb(const CreatedbStmt *stmt)
         * after we grab the exclusive lock.
         */
        if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
-               elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_DATABASE),
+                                errmsg("database \"%s\" already exists", dbname)));
 
        /*
         * Lookup database (template) to be cloned.
@@ -176,8 +229,9 @@ createdb(const CreatedbStmt *stmt)
                                         &src_istemplate, &src_lastsysoid,
                                         &src_vacuumxid, &src_frozenxid,
                                         src_dbpath))
-               elog(ERROR, "CREATE DATABASE: template \"%s\" does not exist",
-                        dbtemplate);
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("template database \"%s\" does not exist", dbtemplate)));
 
        /*
         * Permission check: to copy a DB that's not marked datistemplate, you
@@ -185,9 +239,11 @@ createdb(const CreatedbStmt *stmt)
         */
        if (!src_istemplate)
        {
-               if (!superuser() && GetUserId() != src_owner )
-                       elog(ERROR, "CREATE DATABASE: permission to copy \"%s\" denied",
-                                dbtemplate);
+               if (!superuser() && GetUserId() != src_owner)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("permission denied to copy database \"%s\"",
+                                                       dbtemplate)));
        }
 
        /*
@@ -205,7 +261,10 @@ createdb(const CreatedbStmt *stmt)
         * bulletproof, since someone might connect while we are copying...
         */
        if (DatabaseHasActiveBackends(src_dboid, true))
-               elog(ERROR, "CREATE DATABASE: source database \"%s\" is being accessed by other users", dbtemplate);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+               errmsg("source database \"%s\" is being accessed by other users",
+                          dbtemplate)));
 
        /* If encoding is defaulted, use source's encoding */
        if (encoding < 0)
@@ -213,7 +272,9 @@ createdb(const CreatedbStmt *stmt)
 
        /* Some encodings are client only */
        if (!PG_VALID_BE_ENCODING(encoding))
-               elog(ERROR, "CREATE DATABASE: invalid backend encoding");
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("invalid server encoding %d", encoding)));
 
        /*
         * Preassign OID for pg_database tuple, so that we can compute db
@@ -226,10 +287,10 @@ createdb(const CreatedbStmt *stmt)
         * database), and resolve alternate physical location if one is
         * specified.
         *
-        * If an alternate location is specified but is the same as the
-        * normal path, just drop the alternate-location spec (this seems
-        * friendlier than erroring out).  We must test this case to avoid
-        * creating a circular symlink below.
+        * If an alternate location is specified but is the same as the normal
+        * path, just drop the alternate-location spec (this seems friendlier
+        * than erroring out).  We must test this case to avoid creating a
+        * circular symlink below.
         */
        nominal_loc = GetDatabasePath(dboid);
        alt_loc = resolve_alt_dbpath(dbpath, dboid);
@@ -241,11 +302,17 @@ createdb(const CreatedbStmt *stmt)
        }
 
        if (strchr(nominal_loc, '\''))
-               elog(ERROR, "database path may not contain single quotes");
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_NAME),
+                                errmsg("database path may not contain single quotes")));
        if (alt_loc && strchr(alt_loc, '\''))
-               elog(ERROR, "database path may not contain single quotes");
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_NAME),
+                                errmsg("database path may not contain single quotes")));
        if (strchr(src_loc, '\''))
-               elog(ERROR, "database path may not contain single quotes");
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_NAME),
+                                errmsg("database path may not contain single quotes")));
        /* ... otherwise we'd be open to shell exploits below */
 
        /*
@@ -253,7 +320,7 @@ createdb(const CreatedbStmt *stmt)
         * up-to-date for the copy.  (We really only need to flush buffers for
         * the source database...)
         */
-       BufferSync();
+       BufferSync(-1, -1);
 
        /*
         * Close virtual file descriptors so the kernel has more available for
@@ -268,30 +335,63 @@ createdb(const CreatedbStmt *stmt)
        target_dir = alt_loc ? alt_loc : nominal_loc;
 
        if (mkdir(target_dir, S_IRWXU) != 0)
-               elog(ERROR, "CREATE DATABASE: unable to create database directory '%s': %m",
-                        target_dir);
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not create database directory \"%s\": %m",
+                                               target_dir)));
        if (rmdir(target_dir) != 0)
-               elog(ERROR, "CREATE DATABASE: unable to remove temp directory '%s': %m",
-                        target_dir);
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not remove temporary directory \"%s\": %m",
+                                               target_dir)));
 
        /* Make the symlink, if needed */
        if (alt_loc)
        {
+#ifdef HAVE_SYMLINK                            /* already throws error above */
                if (symlink(alt_loc, nominal_loc) != 0)
-                       elog(ERROR, "CREATE DATABASE: could not link '%s' to '%s': %m",
-                                nominal_loc, alt_loc);
+#endif
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not link file \"%s\" to \"%s\": %m",
+                                                       nominal_loc, alt_loc)));
        }
 
-       /* Copy the template database to the new location */
+       /*
+        * Copy the template database to the new location
+        *
+        * XXX use of cp really makes this code pretty grotty, particularly
+        * with respect to lack of ability to report errors well.  Someday
+        * rewrite to do it for ourselves.
+        */
+#ifndef WIN32
+       /* We might need to use cp -R one day for portability */
        snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir);
-
        if (system(buf) != 0)
        {
                if (remove_dbdirs(nominal_loc, alt_loc))
-                       elog(ERROR, "CREATE DATABASE: could not initialize database directory");
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory"),
+                                        errdetail("Failing system command was: %s", buf),
+                                        errhint("Look in the postmaster's stderr log for more information.")));
                else
-                       elog(ERROR, "CREATE DATABASE: could not initialize database directory; delete failed as well");
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory; delete failed as well"),
+                                        errdetail("Failing system command was: %s", buf),
+                                        errhint("Look in the postmaster's stderr log for more information.")));
        }
+#else  /* WIN32 */
+       if (copydir(src_loc, target_dir) != 0)
+       {
+               /* copydir should already have given details of its troubles */
+               if (remove_dbdirs(nominal_loc, alt_loc))
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory")));
+               else
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory; delete failed as well")));
+       }
+#endif /* WIN32 */
 
        /*
         * Now OK to grab exclusive lock on pg_database.
@@ -304,7 +404,9 @@ createdb(const CreatedbStmt *stmt)
                /* Don't hold lock while doing recursive remove */
                heap_close(pg_database_rel, AccessExclusiveLock);
                remove_dbdirs(nominal_loc, alt_loc);
-               elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_DATABASE),
+                                errmsg("database \"%s\" already exists", dbname)));
        }
 
        /*
@@ -328,11 +430,12 @@ createdb(const CreatedbStmt *stmt)
        /* do not set datpath to null, GetRawDatabaseInfo won't cope */
        new_record[Anum_pg_database_datpath - 1] =
                DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : ""));
+
        /*
         * We deliberately set datconfig and datacl to defaults (NULL), rather
         * than copying them from the template database.  Copying datacl would
-        * be a bad idea when the owner is not the same as the template's owner.
-        * It's more debatable whether datconfig should be copied.
+        * be a bad idea when the owner is not the same as the template's
+        * owner. It's more debatable whether datconfig should be copied.
         */
        new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
        new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
@@ -355,7 +458,7 @@ createdb(const CreatedbStmt *stmt)
         * will see the new database in pg_database right away.  (They'll see
         * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
         */
-       BufferSync();
+       BufferSync(-1, -1);
 }
 
 
@@ -372,17 +475,18 @@ dropdb(const char *dbname)
        char       *nominal_loc;
        char            dbpath[MAXPGPATH];
        Relation        pgdbrel;
-       HeapScanDesc pgdbscan;
+       SysScanDesc pgdbscan;
        ScanKeyData key;
        HeapTuple       tup;
 
        AssertArg(dbname);
 
-       if (strcmp(dbname, DatabaseName) == 0)
-               elog(ERROR, "DROP DATABASE: cannot be executed on the currently open database");
+       if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                                errmsg("cannot drop the currently open database")));
 
-       if (IsTransactionBlock())
-               elog(ERROR, "DROP DATABASE: may not be called in a transaction block");
+       PreventTransactionChain((void *) dbname, "DROP DATABASE");
 
        /*
         * Obtain exclusive lock on pg_database.  We need this to ensure that
@@ -397,10 +501,13 @@ dropdb(const char *dbname)
 
        if (!get_db_info(dbname, &db_id, &db_owner, NULL,
                                         &db_istemplate, NULL, NULL, NULL, dbpath))
-               elog(ERROR, "DROP DATABASE: database \"%s\" does not exist", dbname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", dbname)));
 
        if (GetUserId() != db_owner && !superuser())
-               elog(ERROR, "DROP DATABASE: permission denied");
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          dbname);
 
        /*
         * Disallow dropping a DB that is marked istemplate.  This is just to
@@ -408,7 +515,9 @@ dropdb(const char *dbname)
         * they can do so if they're really determined ...
         */
        if (db_istemplate)
-               elog(ERROR, "DROP DATABASE: database is marked as a template");
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("cannot drop a template database")));
 
        nominal_loc = GetDatabasePath(db_id);
        alt_loc = resolve_alt_dbpath(dbpath, db_id);
@@ -417,31 +526,37 @@ dropdb(const char *dbname)
         * Check for active backends in the target database.
         */
        if (DatabaseHasActiveBackends(db_id, false))
-               elog(ERROR, "DROP DATABASE: database \"%s\" is being accessed by other users", dbname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                          errmsg("database \"%s\" is being accessed by other users",
+                                         dbname)));
 
        /*
         * Find the database's tuple by OID (should be unique).
         */
-       ScanKeyEntryInitialize(&key, 0, ObjectIdAttributeNumber,
-                                                  F_OIDEQ, ObjectIdGetDatum(db_id));
+       ScanKeyInit(&key,
+                               ObjectIdAttributeNumber,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(db_id));
 
-       pgdbscan = heap_beginscan(pgdbrel, SnapshotNow, 1, &key);
+       pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
+                                                                 SnapshotNow, 1, &key);
 
-       tup = heap_getnext(pgdbscan, ForwardScanDirection);
+       tup = systable_getnext(pgdbscan);
        if (!HeapTupleIsValid(tup))
        {
                /*
                 * This error should never come up since the existence of the
                 * database is checked earlier
                 */
-               elog(ERROR, "DROP DATABASE: Database \"%s\" doesn't exist despite earlier reports to the contrary",
+               elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
                         dbname);
        }
 
        /* Remove the database's tuple from pg_database */
        simple_heap_delete(pgdbrel, &tup->t_self);
 
-       heap_endscan(pgdbscan);
+       systable_endscan(pgdbscan);
 
        /*
         * Delete any comments associated with the database
@@ -480,10 +595,106 @@ dropdb(const char *dbname)
         * (They'll see an uncommitted deletion, but they don't care; see
         * GetRawDatabaseInfo.)
         */
-       BufferSync();
+       BufferSync(-1, -1);
 }
 
 
+/*
+ * Rename database
+ */
+void
+RenameDatabase(const char *oldname, const char *newname)
+{
+       HeapTuple       tup,
+                               newtup;
+       Relation        rel;
+       SysScanDesc scan,
+                               scan2;
+       ScanKeyData key,
+                               key2;
+
+       /*
+        * Obtain AccessExclusiveLock so that no new session gets started
+        * while the rename is in progress.
+        */
+       rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
+
+       ScanKeyInit(&key,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(oldname));
+       scan = systable_beginscan(rel, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, &key);
+
+       tup = systable_getnext(scan);
+       if (!HeapTupleIsValid(tup))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", oldname)));
+
+       /*
+        * XXX Client applications probably store the current database
+        * somewhere, so renaming it could cause confusion.  On the other
+        * hand, there may not be an actual problem besides a little
+        * confusion, so think about this and decide.
+        */
+       if (HeapTupleGetOid(tup) == MyDatabaseId)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("current database may not be renamed")));
+
+       /*
+        * Make sure the database does not have active sessions.  Might not be
+        * necessary, but it's consistent with other database operations.
+        */
+       if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                          errmsg("database \"%s\" is being accessed by other users",
+                                         oldname)));
+
+       /* make sure the new name doesn't exist */
+       ScanKeyInit(&key2,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(newname));
+       scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
+                                                          SnapshotNow, 1, &key2);
+       if (HeapTupleIsValid(systable_getnext(scan2)))
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_DATABASE),
+                                errmsg("database \"%s\" already exists", newname)));
+       systable_endscan(scan2);
+
+       /* must be owner */
+       if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          oldname);
+
+       /* must have createdb */
+       if (!have_createdb_privilege())
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                errmsg("permission denied to rename database")));
+
+       /* rename */
+       newtup = heap_copytuple(tup);
+       namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
+       simple_heap_update(rel, &tup->t_self, newtup);
+       CatalogUpdateIndexes(rel, newtup);
+
+       systable_endscan(scan);
+       heap_close(rel, NoLock);
+
+       /*
+        * Force dirty buffers out to disk, so that newly-connecting backends
+        * will see the renamed database in pg_database right away.  (They'll
+        * see an uncommitted tuple, but they don't care; see
+        * GetRawDatabaseInfo.)
+        */
+       BufferSync(-1, -1);
+}
+
 
 /*
  * ALTER DATABASE name SET ...
@@ -495,8 +706,8 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
        HeapTuple       tuple,
                                newtuple;
        Relation        rel;
-       ScanKeyData     scankey;
-       HeapScanDesc scan;
+       ScanKeyData scankey;
+       SysScanDesc scan;
        Datum           repl_val[Natts_pg_database];
        char            repl_null[Natts_pg_database];
        char            repl_repl[Natts_pg_database];
@@ -504,45 +715,54 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
        valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
 
        rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
-       ScanKeyEntryInitialize(&scankey, 0, Anum_pg_database_datname,
-                                                  F_NAMEEQ, NameGetDatum(stmt->dbname));
-       scan = heap_beginscan(rel, SnapshotNow, 1, &scankey);
-       tuple = heap_getnext(scan, ForwardScanDirection);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(stmt->dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, &scankey);
+       tuple = systable_getnext(scan);
        if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "database \"%s\" does not exist", stmt->dbname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", stmt->dbname)));
 
        if (!(superuser()
-                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
-               elog(ERROR, "permission denied");
+               || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          stmt->dbname);
 
        MemSet(repl_repl, ' ', sizeof(repl_repl));
-       repl_repl[Anum_pg_database_datconfig-1] = 'r';
+       repl_repl[Anum_pg_database_datconfig - 1] = 'r';
 
-       if (strcmp(stmt->variable, "all")==0 && valuestr == NULL)
+       if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
        {
                /* RESET ALL */
-               repl_null[Anum_pg_database_datconfig-1] = 'n';
-               repl_val[Anum_pg_database_datconfig-1] = (Datum) 0;
+               repl_null[Anum_pg_database_datconfig - 1] = 'n';
+               repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
        }
        else
        {
-               Datum datum;
-               bool isnull;
-               ArrayType *a;
+               Datum           datum;
+               bool            isnull;
+               ArrayType  *a;
 
-               repl_null[Anum_pg_database_datconfig-1] = ' ';
+               repl_null[Anum_pg_database_datconfig - 1] = ' ';
 
                datum = heap_getattr(tuple, Anum_pg_database_datconfig,
                                                         RelationGetDescr(rel), &isnull);
 
-               a = isnull ? ((ArrayType *) NULL) : DatumGetArrayTypeP(datum);
+               a = isnull ? NULL : DatumGetArrayTypeP(datum);
 
                if (valuestr)
                        a = GUCArrayAdd(a, stmt->variable, valuestr);
                else
                        a = GUCArrayDelete(a, stmt->variable);
 
-               repl_val[Anum_pg_database_datconfig-1] = PointerGetDatum(a);
+               if (a)
+                       repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
+               else
+                       repl_null[Anum_pg_database_datconfig - 1] = 'n';
        }
 
        newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
@@ -551,11 +771,57 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
        /* Update indexes */
        CatalogUpdateIndexes(rel, newtuple);
 
-       heap_endscan(scan);
+       systable_endscan(scan);
        heap_close(rel, RowExclusiveLock);
 }
 
 
+/*
+ * ALTER DATABASE name OWNER TO newowner
+ */
+void
+AlterDatabaseOwner(const char *dbname, const char *newowner)
+{
+       AclId           newdatdba;
+       HeapTuple       tuple,
+                               newtuple;
+       Relation        rel;
+       ScanKeyData scankey;
+       SysScanDesc scan;
+
+       rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, &scankey);
+       tuple = systable_getnext(scan);
+       if (!HeapTupleIsValid(tuple))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", dbname)));
+
+       /* obtain sysid of proposed owner */
+       newdatdba = get_usesysid(newowner); /* will ereport if no such user */
+
+       /* changing owner's database for someone else: must be superuser */
+       /* note that the someone else need not have any permissions */
+       if (!superuser())
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                errmsg("must be superuser to change owner's database for another user")));
+
+       /* change owner */
+       newtuple = heap_copytuple(tuple);
+       ((Form_pg_database) GETSTRUCT(newtuple))->datdba = newdatdba;
+       simple_heap_update(rel, &tuple->t_self, newtuple);
+       CatalogUpdateIndexes(rel, newtuple);
+
+       systable_endscan(scan);
+       heap_close(rel, NoLock);
+}
+
 
 /*
  * Helper functions
@@ -569,7 +835,7 @@ get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
 {
        Relation        relation;
        ScanKeyData scanKey;
-       HeapScanDesc scan;
+       SysScanDesc scan;
        HeapTuple       tuple;
        bool            gottuple;
 
@@ -578,12 +844,15 @@ get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
        /* Caller may wish to grab a better lock on pg_database beforehand... */
        relation = heap_openr(DatabaseRelationName, AccessShareLock);
 
-       ScanKeyEntryInitialize(&scanKey, 0, Anum_pg_database_datname,
-                                                  F_NAMEEQ, NameGetDatum(name));
+       ScanKeyInit(&scanKey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(name));
 
-       scan = heap_beginscan(relation, SnapshotNow, 1, &scanKey);
+       scan = systable_beginscan(relation, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, &scanKey);
 
-       tuple = heap_getnext(scan, ForwardScanDirection);
+       tuple = systable_getnext(scan);
 
        gottuple = HeapTupleIsValid(tuple);
        if (gottuple)
@@ -635,7 +904,7 @@ get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
                }
        }
 
-       heap_endscan(scan);
+       systable_endscan(scan);
        heap_close(relation, AccessShareLock);
 
        return gottuple;
@@ -648,7 +917,7 @@ have_createdb_privilege(void)
        bool            retval;
 
        utup = SearchSysCache(SHADOWSYSID,
-                                                 ObjectIdGetDatum(GetUserId()),
+                                                 Int32GetDatum(GetUserId()),
                                                  0, 0, 0);
 
        if (!HeapTupleIsValid(utup))
@@ -672,12 +941,16 @@ resolve_alt_dbpath(const char *dbpath, Oid dboid)
        if (dbpath == NULL || dbpath[0] == '\0')
                return NULL;
 
-       if (strchr(dbpath, '/'))
+       if (first_path_separator(dbpath))
        {
-               if (dbpath[0] != '/')
-                       elog(ERROR, "Relative paths are not allowed as database locations");
+               if (!is_absolute_path(dbpath))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("relative paths are not allowed as database locations")));
 #ifndef ALLOW_ABSOLUTE_DBPATHS
-               elog(ERROR, "Absolute paths are not allowed as database locations");
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+               errmsg("absolute paths are not allowed as database locations")));
 #endif
                prefix = dbpath;
        }
@@ -687,15 +960,23 @@ resolve_alt_dbpath(const char *dbpath, Oid dboid)
                char       *var = getenv(dbpath);
 
                if (!var)
-                       elog(ERROR, "Postmaster environment variable '%s' not set", dbpath);
-               if (var[0] != '/')
-                       elog(ERROR, "Postmaster environment variable '%s' must be absolute path", dbpath);
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                          errmsg("postmaster environment variable \"%s\" not found",
+                                         dbpath)));
+               if (!is_absolute_path(var))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_NAME),
+                                        errmsg("postmaster environment variable \"%s\" must be absolute path",
+                                                       dbpath)));
                prefix = var;
        }
 
        len = strlen(prefix) + 6 + sizeof(Oid) * 8 + 1;
        if (len >= MAXPGPATH - 100)
-               elog(ERROR, "Alternate path is too long");
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_NAME),
+                                errmsg("alternative path is too long")));
 
        ret = palloc(len);
        snprintf(ret, len, "%s/base/%u", prefix, dboid);
@@ -724,17 +1005,26 @@ remove_dbdirs(const char *nominal_loc, const char *alt_loc)
                /* remove symlink */
                if (unlink(nominal_loc) != 0)
                {
-                       elog(WARNING, "could not remove '%s': %m", nominal_loc);
+                       ereport(WARNING,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not remove file \"%s\": %m", nominal_loc)));
                        success = false;
                }
        }
 
+#ifndef WIN32
        snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir);
+#else
+       snprintf(buf, sizeof(buf), "rmdir /s /q \"%s\"", target_dir);
+#endif
 
        if (system(buf) != 0)
        {
-               elog(WARNING, "database directory '%s' could not be removed",
-                        target_dir);
+               ereport(WARNING,
+                               (errmsg("could not remove database directory \"%s\"",
+                                               target_dir),
+                                errdetail("Failing system command was: %s", buf),
+                                errhint("Look in the postmaster's stderr log for more information.")));
                success = false;
        }
 
@@ -754,18 +1044,20 @@ get_database_oid(const char *dbname)
 {
        Relation        pg_database;
        ScanKeyData entry[1];
-       HeapScanDesc scan;
+       SysScanDesc scan;
        HeapTuple       dbtuple;
        Oid                     oid;
 
        /* There's no syscache for pg_database, so must look the hard way */
        pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
-       ScanKeyEntryInitialize(&entry[0], 0x0,
-                                                  Anum_pg_database_datname, F_NAMEEQ,
-                                                  CStringGetDatum(dbname));
-       scan = heap_beginscan(pg_database, SnapshotNow, 1, entry);
+       ScanKeyInit(&entry[0],
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               CStringGetDatum(dbname));
+       scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, entry);
 
-       dbtuple = heap_getnext(scan, ForwardScanDirection);
+       dbtuple = systable_getnext(scan);
 
        /* We assume that there can be at most one matching tuple */
        if (HeapTupleIsValid(dbtuple))
@@ -773,45 +1065,48 @@ get_database_oid(const char *dbname)
        else
                oid = InvalidOid;
 
-       heap_endscan(scan);
+       systable_endscan(scan);
        heap_close(pg_database, AccessShareLock);
 
        return oid;
 }
 
+
 /*
- * get_database_owner - given a database OID, fetch the owner's usesysid.
+ * get_database_name - given a database OID, look up the name
  *
- * Errors out if database not found.
+ * Returns InvalidOid if database name not found.
  *
  * This is not actually used in this file, but is exported for use elsewhere.
  */
-Oid
-get_database_owner(Oid dbid)
+char *
+get_database_name(Oid dbid)
 {
        Relation        pg_database;
        ScanKeyData entry[1];
-       HeapScanDesc scan;
+       SysScanDesc scan;
        HeapTuple       dbtuple;
-       int32           dba;
+       char       *result;
 
        /* There's no syscache for pg_database, so must look the hard way */
        pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
-       ScanKeyEntryInitialize(&entry[0], 0x0,
-                                                  ObjectIdAttributeNumber, F_OIDEQ,
-                                                  ObjectIdGetDatum(dbid));
-       scan = heap_beginscan(pg_database, SnapshotNow, 1, entry);
+       ScanKeyInit(&entry[0],
+                               ObjectIdAttributeNumber,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(dbid));
+       scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
+                                                         SnapshotNow, 1, entry);
 
-       dbtuple = heap_getnext(scan, ForwardScanDirection);
+       dbtuple = systable_getnext(scan);
 
-       if (!HeapTupleIsValid(dbtuple))
-               elog(ERROR, "database %u does not exist", dbid);
-
-       dba = ((Form_pg_database) GETSTRUCT(dbtuple))->datdba;
+       /* We assume that there can be at most one matching tuple */
+       if (HeapTupleIsValid(dbtuple))
+               result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
+       else
+               result = NULL;
 
-       heap_endscan(scan);
+       systable_endscan(scan);
        heap_close(pg_database, AccessShareLock);
 
-       /* XXX some confusion about whether userids are OID or int4 ... */
-       return (Oid) dba;
+       return result;
 }