]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/dbcommands.c
Improve vacuum code to track minimum Xids per table instead of per database.
[postgresql] / src / backend / commands / dbcommands.c
index 4710cec67a9da1ff0b93b41ed447c82b1517c38c..6d744a5bad32f524d19219e83bde704af5b32079 100644 (file)
  * dbcommands.c
  *             Database management commands (create/drop database).
  *
+ * Note: database creation/destruction commands use exclusive locks on
+ * the database objects (as expressed by LockSharedObject()) to avoid
+ * stepping on each others' toes.  Formerly we used table-level locks
+ * on pg_database, but that's too coarse-grained.
  *
- * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.123 2003/09/25 06:57:58 petere Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.182 2006/07/10 16:20:50 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include <errno.h>
 #include <fcntl.h>
 #include <unistd.h>
 #include <sys/stat.h>
 
 #include "access/genam.h"
 #include "access/heapam.h"
-#include "catalog/catname.h"
 #include "catalog/catalog.h"
-#include "catalog/pg_database.h"
-#include "catalog/pg_shadow.h"
+#include "catalog/dependency.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_authid.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_tablespace.h"
 #include "commands/comment.h"
 #include "commands/dbcommands.h"
+#include "commands/tablespace.h"
+#include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "postmaster/bgwriter.h"
+#include "storage/fd.h"
 #include "storage/freespace.h"
-#include "storage/sinval.h"
+#include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
+#include "utils/flatfiles.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
-#include "mb/pg_wchar.h"               /* encoding check */
-
 
 /* non-export function prototypes */
-static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
-                       int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
-                       TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
-                       char *dbpath);
+static bool get_db_info(const char *name, LOCKMODE lockmode,
+                       Oid *dbIdP, Oid *ownerIdP,
+                       int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
+                       Oid *dbLastSysOidP,
+                       TransactionId *dbVacuumXidP, TransactionId *dbMinXidP,
+                       Oid *dbTablespace);
 static bool have_createdb_privilege(void);
-static char *resolve_alt_dbpath(const char *dbpath, Oid dboid);
-static bool remove_dbdirs(const char *real_loc, const char *altloc);
+static void remove_dbtablespaces(Oid db_id);
+
 
 /*
  * CREATE DATABASE
  */
-
 void
 createdb(const CreatedbStmt *stmt)
 {
-       char       *nominal_loc;
-       char       *alt_loc;
-       char       *target_dir;
-       char            src_loc[MAXPGPATH];
-       char            buf[2 * MAXPGPATH + 100];
+       HeapScanDesc scan;
+       Relation        rel;
        Oid                     src_dboid;
-       AclId           src_owner;
+       Oid                     src_owner;
        int                     src_encoding;
        bool            src_istemplate;
+       bool            src_allowconn;
        Oid                     src_lastsysoid;
        TransactionId src_vacuumxid;
-       TransactionId src_frozenxid;
-       char            src_dbpath[MAXPGPATH];
+       TransactionId src_minxid;
+       Oid                     src_deftablespace;
+       volatile Oid dst_deftablespace;
        Relation        pg_database_rel;
        HeapTuple       tuple;
-       TupleDesc       pg_database_dsc;
        Datum           new_record[Natts_pg_database];
        char            new_record_nulls[Natts_pg_database];
        Oid                     dboid;
-       AclId           datdba;
-       List       *option;
+       Oid                     datdba;
+       ListCell   *option;
+       DefElem    *dtablespacename = NULL;
        DefElem    *downer = NULL;
-       DefElem    *dpath = NULL;
        DefElem    *dtemplate = NULL;
        DefElem    *dencoding = NULL;
+       DefElem    *dconnlimit = NULL;
        char       *dbname = stmt->dbname;
        char       *dbowner = NULL;
-       char       *dbpath = NULL;
-       char       *dbtemplate = NULL;
+       const char *dbtemplate = NULL;
        int                     encoding = -1;
+       int                     dbconnlimit = -1;
+
+       /* don't call this in a transaction block */
+       PreventTransactionChain((void *) stmt, "CREATE DATABASE");
 
        /* Extract options from the statement node tree */
        foreach(option, stmt->options)
        {
                DefElem    *defel = (DefElem *) lfirst(option);
 
-               if (strcmp(defel->defname, "owner") == 0)
+               if (strcmp(defel->defname, "tablespace") == 0)
                {
-                       if (downer)
+                       if (dtablespacename)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
-                       downer = defel;
+                       dtablespacename = defel;
                }
-               else if (strcmp(defel->defname, "location") == 0)
+               else if (strcmp(defel->defname, "owner") == 0)
                {
-                       if (dpath)
+                       if (downer)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or redundant options")));
-                       dpath = defel;
+                       downer = defel;
                }
                else if (strcmp(defel->defname, "template") == 0)
                {
@@ -127,6 +137,21 @@ createdb(const CreatedbStmt *stmt)
                                                 errmsg("conflicting or redundant options")));
                        dencoding = defel;
                }
+               else if (strcmp(defel->defname, "connectionlimit") == 0)
+               {
+                       if (dconnlimit)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       dconnlimit = defel;
+               }
+               else if (strcmp(defel->defname, "location") == 0)
+               {
+                       ereport(WARNING,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("LOCATION is not supported anymore"),
+                                        errhint("Consider using tablespaces instead.")));
+               }
                else
                        elog(ERROR, "option \"%s\" not recognized",
                                 defel->defname);
@@ -134,8 +159,6 @@ createdb(const CreatedbStmt *stmt)
 
        if (downer && downer->arg)
                dbowner = strVal(downer->arg);
-       if (dpath && dpath->arg)
-               dbpath = strVal(dpath->arg);
        if (dtemplate && dtemplate->arg)
                dbtemplate = strVal(dtemplate->arg);
        if (dencoding && dencoding->arg)
@@ -167,68 +190,49 @@ createdb(const CreatedbStmt *stmt)
                        elog(ERROR, "unrecognized node type: %d",
                                 nodeTag(dencoding->arg));
        }
+       if (dconnlimit && dconnlimit->arg)
+               dbconnlimit = intVal(dconnlimit->arg);
 
-       /* obtain sysid of proposed owner */
+       /* obtain OID of proposed owner */
        if (dbowner)
-               datdba = get_usesysid(dbowner); /* will ereport if no such user */
+               datdba = get_roleid_checked(dbowner);
        else
                datdba = GetUserId();
 
-       if (datdba == GetUserId())
-       {
-               /* creating database for self: can be superuser or createdb */
-               if (!superuser() && !have_createdb_privilege())
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                        errmsg("permission denied to create database")));
-       }
-       else
-       {
-               /* creating database for someone else: must be superuser */
-               /* note that the someone else need not have any permissions */
-               if (!superuser())
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                        errmsg("must be superuser to create database for another user")));
-       }
-
-       /* don't call this in a transaction block */
-       PreventTransactionChain((void *) stmt, "CREATE DATABASE");
-
-       /* alternate location requires symlinks */
-#ifndef HAVE_SYMLINK
-       if (dbpath != NULL)
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                  errmsg("cannot use an alternate location on this platform")));
-#endif
-
        /*
-        * Check for db name conflict.  There is a race condition here, since
-        * another backend could create the same DB name before we commit.
-        * However, holding an exclusive lock on pg_database for the whole
-        * time we are copying the source database doesn't seem like a good
-        * idea, so accept possibility of race to create.  We will check again
-        * after we grab the exclusive lock.
+        * To create a database, must have createdb privilege and must be able to
+        * become the target role (this does not imply that the target role itself
+        * must have createdb privilege).  The latter provision guards against
+        * "giveaway" attacks.  Note that a superuser will always have both of
+        * these privileges a fortiori.
         */
-       if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
+       if (!have_createdb_privilege())
                ereport(ERROR,
-                               (errcode(ERRCODE_DUPLICATE_DATABASE),
-                                errmsg("database \"%s\" already exists", dbname)));
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                errmsg("permission denied to create database")));
+
+       check_is_member_of_role(GetUserId(), datdba);
 
        /*
-        * Lookup database (template) to be cloned.
+        * Lookup database (template) to be cloned, and obtain share lock on it.
+        * ShareLock allows two CREATE DATABASEs to work from the same template
+        * concurrently, while ensuring no one is busy dropping it in parallel
+        * (which would be Very Bad since we'd likely get an incomplete copy
+        * without knowing it).  This also prevents any new connections from being
+        * made to the source until we finish copying it, so we can be sure it
+        * won't change underneath us.
         */
        if (!dbtemplate)
                dbtemplate = "template1";               /* Default template database name */
 
-       if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
-                                        &src_istemplate, &src_lastsysoid,
-                                        &src_vacuumxid, &src_frozenxid,
-                                        src_dbpath))
+       if (!get_db_info(dbtemplate, ShareLock,
+                                        &src_dboid, &src_owner, &src_encoding,
+                                        &src_istemplate, &src_allowconn, &src_lastsysoid,
+                                        &src_vacuumxid, &src_minxid, &src_deftablespace))
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
-                                errmsg("template database \"%s\" does not exist", dbtemplate)));
+                                errmsg("template database \"%s\" does not exist",
+                                               dbtemplate)));
 
        /*
         * Permission check: to copy a DB that's not marked datistemplate, you
@@ -236,32 +240,23 @@ createdb(const CreatedbStmt *stmt)
         */
        if (!src_istemplate)
        {
-               if (!superuser() && GetUserId() != src_owner)
+               if (!pg_database_ownercheck(src_dboid, GetUserId()))
                        ereport(ERROR,
                                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                         errmsg("permission denied to copy database \"%s\"",
                                                        dbtemplate)));
        }
 
-       /*
-        * Determine physical path of source database
-        */
-       alt_loc = resolve_alt_dbpath(src_dbpath, src_dboid);
-       if (!alt_loc)
-               alt_loc = GetDatabasePath(src_dboid);
-       strcpy(src_loc, alt_loc);
-
        /*
         * The source DB can't have any active backends, except this one
         * (exception is to allow CREATE DB while connected to template1).
-        * Otherwise we might copy inconsistent data.  This check is not
-        * bulletproof, since someone might connect while we are copying...
+        * Otherwise we might copy inconsistent data.
         */
        if (DatabaseHasActiveBackends(src_dboid, true))
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_IN_USE),
-               errmsg("source database \"%s\" is being accessed by other users",
-                          dbtemplate)));
+                       errmsg("source database \"%s\" is being accessed by other users",
+                                  dbtemplate)));
 
        /* If encoding is defaulted, use source's encoding */
        if (encoding < 0)
@@ -273,142 +268,81 @@ createdb(const CreatedbStmt *stmt)
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("invalid server encoding %d", encoding)));
 
-       /*
-        * Preassign OID for pg_database tuple, so that we can compute db
-        * path.
-        */
-       dboid = newoid();
-
-       /*
-        * Compute nominal location (where we will try to access the
-        * database), and resolve alternate physical location if one is
-        * specified.
-        *
-        * If an alternate location is specified but is the same as the normal
-        * path, just drop the alternate-location spec (this seems friendlier
-        * than erroring out).  We must test this case to avoid creating a
-        * circular symlink below.
-        */
-       nominal_loc = GetDatabasePath(dboid);
-       alt_loc = resolve_alt_dbpath(dbpath, dboid);
-
-       if (alt_loc && strcmp(alt_loc, nominal_loc) == 0)
+       /* Resolve default tablespace for new database */
+       if (dtablespacename && dtablespacename->arg)
        {
-               alt_loc = NULL;
-               dbpath = NULL;
-       }
-
-       if (strchr(nominal_loc, '\''))
-               ereport(ERROR,
-                               (errcode(ERRCODE_INVALID_NAME),
-                                errmsg("database path may not contain single quotes")));
-       if (alt_loc && strchr(alt_loc, '\''))
-               ereport(ERROR,
-                               (errcode(ERRCODE_INVALID_NAME),
-                                errmsg("database path may not contain single quotes")));
-       if (strchr(src_loc, '\''))
-               ereport(ERROR,
-                               (errcode(ERRCODE_INVALID_NAME),
-                                errmsg("database path may not contain single quotes")));
-       /* ... otherwise we'd be open to shell exploits below */
-
-       /*
-        * Force dirty buffers out to disk, to ensure source database is
-        * up-to-date for the copy.  (We really only need to flush buffers for
-        * the source database...)
-        */
-       BufferSync();
-
-       /*
-        * Close virtual file descriptors so the kernel has more available for
-        * the mkdir() and system() calls below.
-        */
-       closeAllVfds();
+               char       *tablespacename;
+               AclResult       aclresult;
 
-       /*
-        * Check we can create the target directory --- but then remove it
-        * because we rely on cp(1) to create it for real.
-        */
-       target_dir = alt_loc ? alt_loc : nominal_loc;
+               tablespacename = strVal(dtablespacename->arg);
+               dst_deftablespace = get_tablespace_oid(tablespacename);
+               if (!OidIsValid(dst_deftablespace))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                        errmsg("tablespace \"%s\" does not exist",
+                                                       tablespacename)));
+               /* check permissions */
+               aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
+                                                                                  ACL_CREATE);
+               if (aclresult != ACLCHECK_OK)
+                       aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
+                                                  tablespacename);
 
-       if (mkdir(target_dir, S_IRWXU) != 0)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not create database directory \"%s\": %m",
-                                               target_dir)));
-       if (rmdir(target_dir) != 0)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not remove temporary directory \"%s\": %m",
-                                               target_dir)));
+               /*
+                * If we are trying to change the default tablespace of the template,
+                * we require that the template not have any files in the new default
+                * tablespace.  This is necessary because otherwise the copied
+                * database would contain pg_class rows that refer to its default
+                * tablespace both explicitly (by OID) and implicitly (as zero), which
+                * would cause problems.  For example another CREATE DATABASE using
+                * the copied database as template, and trying to change its default
+                * tablespace again, would yield outright incorrect results (it would
+                * improperly move tables to the new default tablespace that should
+                * stay in the same tablespace).
+                */
+               if (dst_deftablespace != src_deftablespace)
+               {
+                       char       *srcpath;
+                       struct stat st;
 
-       /* Make the symlink, if needed */
-       if (alt_loc)
-       {
-#ifdef HAVE_SYMLINK                            /* already throws error above */
-               if (symlink(alt_loc, nominal_loc) != 0)
-#endif
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not link file \"%s\" to \"%s\": %m",
-                                                       nominal_loc, alt_loc)));
-       }
+                       srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
 
-       /*
-        * Copy the template database to the new location
-        *
-        * XXX use of cp really makes this code pretty grotty, particularly
-        * with respect to lack of ability to report errors well.  Someday
-        * rewrite to do it for ourselves.
-        */
-#ifndef WIN32
-       snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir);
-       if (system(buf) != 0)
-       {
-               if (remove_dbdirs(nominal_loc, alt_loc))
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory"),
-                                        errdetail("Failing system command was: %s", buf),
-                                        errhint("Look in the postmaster's stderr log for more information.")));
-               else
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory; delete failed as well"),
-                                        errdetail("Failing system command was: %s", buf),
-                                        errhint("Look in the postmaster's stderr log for more information.")));
+                       if (stat(srcpath, &st) == 0 &&
+                               S_ISDIR(st.st_mode) &&
+                               !directory_is_empty(srcpath))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("cannot assign new default tablespace \"%s\"",
+                                                               tablespacename),
+                                                errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
+                                                                  dbtemplate)));
+                       pfree(srcpath);
+               }
        }
-#else  /* WIN32 */
-       if (copydir(src_loc, target_dir) != 0)
+       else
        {
-               /* copydir should already have given details of its troubles */
-               if (remove_dbdirs(nominal_loc, alt_loc))
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory")));
-               else
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory; delete failed as well")));
+               /* Use template database's default tablespace */
+               dst_deftablespace = src_deftablespace;
+               /* Note there is no additional permission check in this path */
        }
-#endif /* WIN32 */
 
        /*
-        * Now OK to grab exclusive lock on pg_database.
+        * Check for db name conflict.  This is just to give a more friendly
+        * error message than "unique index violation".  There's a race condition
+        * but we're willing to accept the less friendly message in that case.
         */
-       pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
-
-       /* Check to see if someone else created same DB name meanwhile. */
-       if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
-       {
-               /* Don't hold lock while doing recursive remove */
-               heap_close(pg_database_rel, AccessExclusiveLock);
-               remove_dbdirs(nominal_loc, alt_loc);
+       if (OidIsValid(get_database_oid(dbname)))
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_DATABASE),
                                 errmsg("database \"%s\" already exists", dbname)));
-       }
 
        /*
-        * Insert a new tuple into pg_database
+        * Insert a new tuple into pg_database.  This establishes our ownership
+        * of the new database name (anyone else trying to insert the same name
+        * will block on the unique index, and fail after we commit).  It also
+        * assigns the OID that the new database will have.
         */
-       pg_database_dsc = RelationGetDescr(pg_database_rel);
+       pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
 
        /* Form tuple */
        MemSet(new_record, 0, sizeof(new_record));
@@ -416,16 +350,15 @@ createdb(const CreatedbStmt *stmt)
 
        new_record[Anum_pg_database_datname - 1] =
                DirectFunctionCall1(namein, CStringGetDatum(dbname));
-       new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
+       new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
        new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
        new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
        new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
+       new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
        new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
        new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
-       new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
-       /* do not set datpath to null, GetRawDatabaseInfo won't cope */
-       new_record[Anum_pg_database_datpath - 1] =
-               DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : ""));
+       new_record[Anum_pg_database_datminxid - 1] = TransactionIdGetDatum(src_minxid);
+       new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
 
        /*
         * We deliberately set datconfig and datacl to defaults (NULL), rather
@@ -436,25 +369,157 @@ createdb(const CreatedbStmt *stmt)
        new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
        new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
 
-       tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
-
-       HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
-                                                                                * selection */
+       tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
+                                                  new_record, new_record_nulls);
 
-       simple_heap_insert(pg_database_rel, tuple);
+       dboid = simple_heap_insert(pg_database_rel, tuple);
 
        /* Update indexes */
        CatalogUpdateIndexes(pg_database_rel, tuple);
 
-       /* Close pg_database, but keep lock till commit */
-       heap_close(pg_database_rel, NoLock);
+       /*
+        * Now generate additional catalog entries associated with the new DB
+        */
+
+       /* Register owner dependency */
+       recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
+
+       /* Create pg_shdepend entries for objects within database */
+       copyTemplateDependencies(src_dboid, dboid);
 
        /*
-        * Force dirty buffers out to disk, so that newly-connecting backends
-        * will see the new database in pg_database right away.  (They'll see
-        * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
+        * Force dirty buffers out to disk, to ensure source database is
+        * up-to-date for the copy.  (We really only need to flush buffers for the
+        * source database, but bufmgr.c provides no API for that.)
         */
        BufferSync();
+
+       /*
+        * Once we start copying subdirectories, we need to be able to clean 'em
+        * up if we fail.  Establish a TRY block to make sure this happens. (This
+        * is not a 100% solution, because of the possibility of failure during
+        * transaction commit after we leave this routine, but it should handle
+        * most scenarios.)
+        */
+       PG_TRY();
+       {
+               /*
+                * Iterate through all tablespaces of the template database, and copy
+                * each one to the new database.
+                */
+               rel = heap_open(TableSpaceRelationId, AccessShareLock);
+               scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+               while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+               {
+                       Oid                     srctablespace = HeapTupleGetOid(tuple);
+                       Oid                     dsttablespace;
+                       char       *srcpath;
+                       char       *dstpath;
+                       struct stat st;
+
+                       /* No need to copy global tablespace */
+                       if (srctablespace == GLOBALTABLESPACE_OID)
+                               continue;
+
+                       srcpath = GetDatabasePath(src_dboid, srctablespace);
+
+                       if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+                               directory_is_empty(srcpath))
+                       {
+                               /* Assume we can ignore it */
+                               pfree(srcpath);
+                               continue;
+                       }
+
+                       if (srctablespace == src_deftablespace)
+                               dsttablespace = dst_deftablespace;
+                       else
+                               dsttablespace = srctablespace;
+
+                       dstpath = GetDatabasePath(dboid, dsttablespace);
+
+                       /*
+                        * Copy this subdirectory to the new location
+                        *
+                        * We don't need to copy subdirectories
+                        */
+                       copydir(srcpath, dstpath, false);
+
+                       /* Record the filesystem change in XLOG */
+                       {
+                               xl_dbase_create_rec xlrec;
+                               XLogRecData rdata[1];
+
+                               xlrec.db_id = dboid;
+                               xlrec.tablespace_id = dsttablespace;
+                               xlrec.src_db_id = src_dboid;
+                               xlrec.src_tablespace_id = srctablespace;
+
+                               rdata[0].data = (char *) &xlrec;
+                               rdata[0].len = sizeof(xl_dbase_create_rec);
+                               rdata[0].buffer = InvalidBuffer;
+                               rdata[0].next = NULL;
+
+                               (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
+                       }
+               }
+               heap_endscan(scan);
+               heap_close(rel, AccessShareLock);
+
+               /*
+                * We force a checkpoint before committing.  This effectively means
+                * that committed XLOG_DBASE_CREATE operations will never need to be
+                * replayed (at least not in ordinary crash recovery; we still have to
+                * make the XLOG entry for the benefit of PITR operations). This
+                * avoids two nasty scenarios:
+                *
+                * #1: When PITR is off, we don't XLOG the contents of newly created
+                * indexes; therefore the drop-and-recreate-whole-directory behavior
+                * of DBASE_CREATE replay would lose such indexes.
+                *
+                * #2: Since we have to recopy the source database during DBASE_CREATE
+                * replay, we run the risk of copying changes in it that were
+                * committed after the original CREATE DATABASE command but before the
+                * system crash that led to the replay.  This is at least unexpected
+                * and at worst could lead to inconsistencies, eg duplicate table
+                * names.
+                *
+                * (Both of these were real bugs in releases 8.0 through 8.0.3.)
+                *
+                * In PITR replay, the first of these isn't an issue, and the second
+                * is only a risk if the CREATE DATABASE and subsequent template
+                * database change both occur while a base backup is being taken.
+                * There doesn't seem to be much we can do about that except document
+                * it as a limitation.
+                *
+                * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
+                * we can avoid this.
+                */
+               RequestCheckpoint(true, false);
+
+               /*
+                * Close pg_database, but keep lock till commit (this is important
+                * to prevent any risk of deadlock failure while updating flat file)
+                */
+               heap_close(pg_database_rel, NoLock);
+
+               /*
+                * Set flag to update flat database file at commit.
+                */
+               database_file_update_needed();
+       }
+       PG_CATCH();
+       {
+               /* Release lock on source database before doing recursive remove */
+               UnlockSharedObject(DatabaseRelationId, src_dboid, 0,
+                                                  ShareLock);
+
+               /* Throw away any successfully copied subdirectories */
+               remove_dbtablespaces(dboid);
+
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
 }
 
 
@@ -462,19 +527,15 @@ createdb(const CreatedbStmt *stmt)
  * DROP DATABASE
  */
 void
-dropdb(const char *dbname)
+dropdb(const char *dbname, bool missing_ok)
 {
-       int4            db_owner;
-       bool            db_istemplate;
        Oid                     db_id;
-       char       *alt_loc;
-       char       *nominal_loc;
-       char            dbpath[MAXPGPATH];
+       bool            db_istemplate;
        Relation        pgdbrel;
-       SysScanDesc pgdbscan;
-       ScanKeyData key;
        HeapTuple       tup;
 
+       PreventTransactionChain((void *) dbname, "DROP DATABASE");
+
        AssertArg(dbname);
 
        if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
@@ -482,95 +543,91 @@ dropdb(const char *dbname)
                                (errcode(ERRCODE_OBJECT_IN_USE),
                                 errmsg("cannot drop the currently open database")));
 
-       PreventTransactionChain((void *) dbname, "DROP DATABASE");
-
        /*
-        * Obtain exclusive lock on pg_database.  We need this to ensure that
-        * no new backend starts up in the target database while we are
-        * deleting it.  (Actually, a new backend might still manage to start
-        * up, because it will read pg_database without any locking to
-        * discover the database's OID.  But it will detect its error in
-        * ReverifyMyDatabase and shut down before any serious damage is done.
-        * See postinit.c.)
+        * Look up the target database's OID, and get exclusive lock on it.
+        * We need this to ensure that no new backend starts up in the target
+        * database while we are deleting it (see postinit.c), and that no one is
+        * using it as a CREATE DATABASE template or trying to delete it for
+        * themselves.
         */
-       pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
+       pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
 
-       if (!get_db_info(dbname, &db_id, &db_owner, NULL,
-                                        &db_istemplate, NULL, NULL, NULL, dbpath))
-               ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_DATABASE),
-                                errmsg("database \"%s\" does not exist", dbname)));
+       if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
+                                        &db_istemplate, NULL, NULL, NULL, NULL, NULL))
+       {
+               if (!missing_ok)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                        errmsg("database \"%s\" does not exist", dbname)));
+               }
+               else
+               {
+                       /* Close pg_database, release the lock, since we changed nothing */
+                       heap_close(pgdbrel, RowExclusiveLock);
+                       ereport(NOTICE,
+                                       (errmsg("database \"%s\" does not exist, skipping",
+                                                       dbname)));
+                       return;
+               }
+       }
 
-       if (GetUserId() != db_owner && !superuser())
+       /*
+        * Permission checks
+        */
+       if (!pg_database_ownercheck(db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
                                           dbname);
 
        /*
         * Disallow dropping a DB that is marked istemplate.  This is just to
-        * prevent people from accidentally dropping template0 or template1;
-        * they can do so if they're really determined ...
+        * prevent people from accidentally dropping template0 or template1; they
+        * can do so if they're really determined ...
         */
        if (db_istemplate)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("cannot drop a template database")));
 
-       nominal_loc = GetDatabasePath(db_id);
-       alt_loc = resolve_alt_dbpath(dbpath, db_id);
-
        /*
-        * Check for active backends in the target database.
+        * Check for active backends in the target database.  (Because we hold
+        * the database lock, no new ones can start after this.)
         */
        if (DatabaseHasActiveBackends(db_id, false))
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_IN_USE),
-                          errmsg("database \"%s\" is being accessed by other users",
-                                         dbname)));
+                                errmsg("database \"%s\" is being accessed by other users",
+                                               dbname)));
 
        /*
-        * Find the database's tuple by OID (should be unique).
+        * Remove the database's tuple from pg_database.
         */
-       ScanKeyEntryInitialize(&key, 0, ObjectIdAttributeNumber,
-                                                  F_OIDEQ, ObjectIdGetDatum(db_id));
-
-       pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true, SnapshotNow, 1, &key);
-
-       tup = systable_getnext(pgdbscan);
+       tup = SearchSysCache(DATABASEOID,
+                                                ObjectIdGetDatum(db_id),
+                                                0, 0, 0);
        if (!HeapTupleIsValid(tup))
-       {
-               /*
-                * This error should never come up since the existence of the
-                * database is checked earlier
-                */
-               elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
-                        dbname);
-       }
+               elog(ERROR, "cache lookup failed for database %u", db_id);
 
-       /* Remove the database's tuple from pg_database */
        simple_heap_delete(pgdbrel, &tup->t_self);
 
-       systable_endscan(pgdbscan);
+       ReleaseSysCache(tup);
 
        /*
-        * Delete any comments associated with the database
-        *
-        * NOTE: this is probably dead code since any such comments should have
-        * been in that database, not mine.
+        * Delete any comments associated with the database.
         */
-       DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
+       DeleteSharedComments(db_id, DatabaseRelationId);
 
        /*
-        * Close pg_database, but keep exclusive lock till commit to ensure
-        * that any new backend scanning pg_database will see the tuple dead.
+        * Remove shared dependency references for the database.
         */
-       heap_close(pgdbrel, NoLock);
+       dropDatabaseDependencies(db_id);
 
        /*
-        * Drop pages for this database that are in the shared buffer cache.
-        * This is important to ensure that no remaining backend tries to
-        * write out a dirty buffer to the dead database later...
+        * Drop pages for this database that are in the shared buffer cache. This
+        * is important to ensure that no remaining backend tries to write out a
+        * dirty buffer to the dead database later...
         */
-       DropBuffers(db_id);
+       DropDatabaseBuffers(db_id);
 
        /*
         * Also, clean out any entries in the shared free space map.
@@ -578,17 +635,28 @@ dropdb(const char *dbname)
        FreeSpaceMapForgetDatabase(db_id);
 
        /*
-        * Remove the database's subdirectory and everything in it.
+        * On Windows, force a checkpoint so that the bgwriter doesn't hold any
+        * open files, which would cause rmdir() to fail.
         */
-       remove_dbdirs(nominal_loc, alt_loc);
+#ifdef WIN32
+       RequestCheckpoint(true, false);
+#endif
 
        /*
-        * Force dirty buffers out to disk, so that newly-connecting backends
-        * will see the database tuple marked dead in pg_database right away.
-        * (They'll see an uncommitted deletion, but they don't care; see
-        * GetRawDatabaseInfo.)
+        * Remove all tablespace subdirs belonging to the database.
         */
-       BufferSync();
+       remove_dbtablespaces(db_id);
+
+       /*
+        * Close pg_database, but keep lock till commit (this is important
+        * to prevent any risk of deadlock failure while updating flat file)
+        */
+       heap_close(pgdbrel, NoLock);
+
+       /*
+        * Set flag to update flat database file at commit.
+        */
+       database_file_update_needed();
 }
 
 
@@ -598,88 +666,173 @@ dropdb(const char *dbname)
 void
 RenameDatabase(const char *oldname, const char *newname)
 {
-       HeapTuple       tup,
-                               newtup;
+       Oid                     db_id;
+       HeapTuple       newtup;
        Relation        rel;
-       SysScanDesc scan,
-                               scan2;
-       ScanKeyData key,
-                               key2;
 
        /*
-        * Obtain AccessExclusiveLock so that no new session gets started
-        * while the rename is in progress.
+        * Look up the target database's OID, and get exclusive lock on it.
+        * We need this for the same reasons as DROP DATABASE.
         */
-       rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
-
-       ScanKeyEntryInitialize(&key, 0, Anum_pg_database_datname,
-                                                  F_NAMEEQ, NameGetDatum(oldname));
-       scan = systable_beginscan(rel, DatabaseNameIndex, true, SnapshotNow, 1, &key);
+       rel = heap_open(DatabaseRelationId, RowExclusiveLock);
 
-       tup = systable_getnext(scan);
-       if (!HeapTupleIsValid(tup))
+       if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
+                                        NULL, NULL, NULL, NULL, NULL, NULL))
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", oldname)));
 
        /*
-        * XXX Client applications probably store the current database
-        * somewhere, so renaming it could cause confusion.  On the other
-        * hand, there may not be an actual problem besides a little
-        * confusion, so think about this and decide.
+        * XXX Client applications probably store the current database somewhere,
+        * so renaming it could cause confusion.  On the other hand, there may not
+        * be an actual problem besides a little confusion, so think about this
+        * and decide.
         */
-       if (HeapTupleGetOid(tup) == MyDatabaseId)
+       if (db_id == MyDatabaseId)
                ereport(ERROR,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("current database may not be renamed")));
 
        /*
-        * Make sure the database does not have active sessions.  Might not be
-        * necessary, but it's consistent with other database operations.
+        * Make sure the database does not have active sessions.  This is the
+        * same concern as above, but applied to other sessions.
         */
-       if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
+       if (DatabaseHasActiveBackends(db_id, false))
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_IN_USE),
-                          errmsg("database \"%s\" is being accessed by other users",
-                                         oldname)));
+                                errmsg("database \"%s\" is being accessed by other users",
+                                               oldname)));
 
        /* make sure the new name doesn't exist */
-       ScanKeyEntryInitialize(&key2, 0, Anum_pg_database_datname,
-                                                  F_NAMEEQ, NameGetDatum(newname));
-       scan2 = systable_beginscan(rel, DatabaseNameIndex, true, SnapshotNow, 1, &key2);
-       if (HeapTupleIsValid(systable_getnext(scan2)))
+       if (OidIsValid(get_database_oid(newname)))
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_DATABASE),
                                 errmsg("database \"%s\" already exists", newname)));
-       systable_endscan(scan2);
 
        /* must be owner */
-       if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+       if (!pg_database_ownercheck(db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
                                           oldname);
 
-       /* must have createdb */
+       /* must have createdb rights */
        if (!have_createdb_privilege())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 errmsg("permission denied to rename database")));
 
        /* rename */
-       newtup = heap_copytuple(tup);
+       newtup = SearchSysCacheCopy(DATABASEOID,
+                                                               ObjectIdGetDatum(db_id),
+                                                               0, 0, 0);
+       if (!HeapTupleIsValid(newtup))
+               elog(ERROR, "cache lookup failed for database %u", db_id);
        namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
-       simple_heap_update(rel, &tup->t_self, newtup);
+       simple_heap_update(rel, &newtup->t_self, newtup);
        CatalogUpdateIndexes(rel, newtup);
 
+       /*
+        * Close pg_database, but keep lock till commit (this is important
+        * to prevent any risk of deadlock failure while updating flat file)
+        */
+       heap_close(rel, NoLock);
+
+       /*
+        * Set flag to update flat database file at commit.
+        */
+       database_file_update_needed();
+}
+
+
+/*
+ * ALTER DATABASE name ...
+ */
+void
+AlterDatabase(AlterDatabaseStmt *stmt)
+{
+       Relation        rel;
+       HeapTuple       tuple,
+                               newtuple;
+       ScanKeyData scankey;
+       SysScanDesc scan;
+       ListCell   *option;
+       int                     connlimit = -1;
+       DefElem    *dconnlimit = NULL;
+       Datum           new_record[Natts_pg_database];
+       char            new_record_nulls[Natts_pg_database];
+       char            new_record_repl[Natts_pg_database];
+
+       /* Extract options from the statement node tree */
+       foreach(option, stmt->options)
+       {
+               DefElem    *defel = (DefElem *) lfirst(option);
+
+               if (strcmp(defel->defname, "connectionlimit") == 0)
+               {
+                       if (dconnlimit)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       dconnlimit = defel;
+               }
+               else
+                       elog(ERROR, "option \"%s\" not recognized",
+                                defel->defname);
+       }
+
+       if (dconnlimit)
+               connlimit = intVal(dconnlimit->arg);
+
+       /*
+        * Get the old tuple.  We don't need a lock on the database per se,
+        * because we're not going to do anything that would mess up incoming
+        * connections.
+        */
+       rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(stmt->dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+                                                         SnapshotNow, 1, &scankey);
+       tuple = systable_getnext(scan);
+       if (!HeapTupleIsValid(tuple))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", stmt->dbname)));
+
+       if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          stmt->dbname);
+
+       /*
+        * Build an updated tuple, perusing the information just obtained
+        */
+       MemSet(new_record, 0, sizeof(new_record));
+       MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
+       MemSet(new_record_repl, ' ', sizeof(new_record_repl));
+
+       if (dconnlimit)
+       {
+               new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
+               new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
+       }
+
+       newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
+                                                               new_record_nulls, new_record_repl);
+       simple_heap_update(rel, &tuple->t_self, newtuple);
+
+       /* Update indexes */
+       CatalogUpdateIndexes(rel, newtuple);
+
        systable_endscan(scan);
+
+       /* Close pg_database, but keep lock till commit */
        heap_close(rel, NoLock);
 
        /*
-        * Force dirty buffers out to disk, so that newly-connecting backends
-        * will see the renamed database in pg_database right away.  (They'll
-        * see an uncommitted tuple, but they don't care; see
-        * GetRawDatabaseInfo.)
+        * We don't bother updating the flat file since the existing options for
+        * ALTER DATABASE don't affect it.
         */
-       BufferSync();
 }
 
 
@@ -701,18 +854,25 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
 
        valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
 
-       rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
-       ScanKeyEntryInitialize(&scankey, 0, Anum_pg_database_datname,
-                                                  F_NAMEEQ, NameGetDatum(stmt->dbname));
-       scan = systable_beginscan(rel, DatabaseNameIndex, true, SnapshotNow, 1, &scankey);
+       /*
+        * Get the old tuple.  We don't need a lock on the database per se,
+        * because we're not going to do anything that would mess up incoming
+        * connections.
+        */
+       rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(stmt->dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+                                                         SnapshotNow, 1, &scankey);
        tuple = systable_getnext(scan);
        if (!HeapTupleIsValid(tuple))
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", stmt->dbname)));
 
-       if (!(superuser()
-               || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
+       if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
                                           stmt->dbname);
 
@@ -736,7 +896,7 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
                datum = heap_getattr(tuple, Anum_pg_database_datconfig,
                                                         RelationGetDescr(rel), &isnull);
 
-               a = isnull ? ((ArrayType *) NULL) : DatumGetArrayTypeP(datum);
+               a = isnull ? NULL : DatumGetArrayTypeP(datum);
 
                if (valuestr)
                        a = GUCArrayAdd(a, stmt->variable, valuestr);
@@ -749,221 +909,347 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
                        repl_null[Anum_pg_database_datconfig - 1] = 'n';
        }
 
-       newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
+       newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
        simple_heap_update(rel, &tuple->t_self, newtuple);
 
        /* Update indexes */
        CatalogUpdateIndexes(rel, newtuple);
 
        systable_endscan(scan);
-       heap_close(rel, RowExclusiveLock);
+
+       /* Close pg_database, but keep lock till commit */
+       heap_close(rel, NoLock);
+
+       /*
+        * We don't bother updating the flat file since ALTER DATABASE SET doesn't
+        * affect it.
+        */
 }
 
 
+/*
+ * ALTER DATABASE name OWNER TO newowner
+ */
+void
+AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
+{
+       HeapTuple       tuple;
+       Relation        rel;
+       ScanKeyData scankey;
+       SysScanDesc scan;
+       Form_pg_database datForm;
+
+       /*
+        * Get the old tuple.  We don't need a lock on the database per se,
+        * because we're not going to do anything that would mess up incoming
+        * connections.
+        */
+       rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+                                                         SnapshotNow, 1, &scankey);
+       tuple = systable_getnext(scan);
+       if (!HeapTupleIsValid(tuple))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", dbname)));
+
+       datForm = (Form_pg_database) GETSTRUCT(tuple);
+
+       /*
+        * If the new owner is the same as the existing owner, consider the
+        * command to have succeeded.  This is to be consistent with other
+        * objects.
+        */
+       if (datForm->datdba != newOwnerId)
+       {
+               Datum           repl_val[Natts_pg_database];
+               char            repl_null[Natts_pg_database];
+               char            repl_repl[Natts_pg_database];
+               Acl                *newAcl;
+               Datum           aclDatum;
+               bool            isNull;
+               HeapTuple       newtuple;
+
+               /* Otherwise, must be owner of the existing object */
+               if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
+                       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                                  dbname);
+
+               /* Must be able to become new owner */
+               check_is_member_of_role(GetUserId(), newOwnerId);
+
+               /*
+                * must have createdb rights
+                *
+                * NOTE: This is different from other alter-owner checks in that the
+                * current user is checked for createdb privileges instead of the
+                * destination owner.  This is consistent with the CREATE case for
+                * databases.  Because superusers will always have this right, we need
+                * no special case for them.
+                */
+               if (!have_createdb_privilege())
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                  errmsg("permission denied to change owner of database")));
+
+               memset(repl_null, ' ', sizeof(repl_null));
+               memset(repl_repl, ' ', sizeof(repl_repl));
+
+               repl_repl[Anum_pg_database_datdba - 1] = 'r';
+               repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
+
+               /*
+                * Determine the modified ACL for the new owner.  This is only
+                * necessary when the ACL is non-null.
+                */
+               aclDatum = heap_getattr(tuple,
+                                                               Anum_pg_database_datacl,
+                                                               RelationGetDescr(rel),
+                                                               &isNull);
+               if (!isNull)
+               {
+                       newAcl = aclnewowner(DatumGetAclP(aclDatum),
+                                                                datForm->datdba, newOwnerId);
+                       repl_repl[Anum_pg_database_datacl - 1] = 'r';
+                       repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
+               }
+
+               newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
+               simple_heap_update(rel, &newtuple->t_self, newtuple);
+               CatalogUpdateIndexes(rel, newtuple);
+
+               heap_freetuple(newtuple);
+
+               /* Update owner dependency reference */
+               changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
+                                                               newOwnerId);
+       }
+
+       systable_endscan(scan);
+
+       /* Close pg_database, but keep lock till commit */
+       heap_close(rel, NoLock);
+
+       /*
+        * We don't bother updating the flat file since ALTER DATABASE OWNER
+        * doesn't affect it.
+        */
+}
+
 
 /*
  * Helper functions
  */
 
+/*
+ * Look up info about the database named "name".  If the database exists,
+ * obtain the specified lock type on it, fill in any of the remaining
+ * parameters that aren't NULL, and return TRUE.  If no such database,
+ * return FALSE.
+ */
 static bool
-get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
-                       int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
-                       TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
-                       char *dbpath)
+get_db_info(const char *name, LOCKMODE lockmode,
+                       Oid *dbIdP, Oid *ownerIdP,
+                       int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
+                       Oid *dbLastSysOidP,
+                       TransactionId *dbVacuumXidP, TransactionId *dbMinXidP,
+                       Oid *dbTablespace)
 {
+       bool            result = false;
        Relation        relation;
-       ScanKeyData scanKey;
-       SysScanDesc scan;
-       HeapTuple       tuple;
-       bool            gottuple;
 
        AssertArg(name);
 
        /* Caller may wish to grab a better lock on pg_database beforehand... */
-       relation = heap_openr(DatabaseRelationName, AccessShareLock);
+       relation = heap_open(DatabaseRelationId, AccessShareLock);
+
+       /*
+        * Loop covers the rare case where the database is renamed before we
+        * can lock it.  We try again just in case we can find a new one of
+        * the same name.
+        */
+       for (;;)
+       {
+               ScanKeyData scanKey;
+               SysScanDesc scan;
+               HeapTuple       tuple;
+               Oid                     dbOid;
 
-       ScanKeyEntryInitialize(&scanKey, 0, Anum_pg_database_datname,
-                                                  F_NAMEEQ, NameGetDatum(name));
+               /*
+                * there's no syscache for database-indexed-by-name,
+                * so must do it the hard way
+                */
+               ScanKeyInit(&scanKey,
+                                       Anum_pg_database_datname,
+                                       BTEqualStrategyNumber, F_NAMEEQ,
+                                       NameGetDatum(name));
 
-       scan = systable_beginscan(relation, DatabaseNameIndex, true, SnapshotNow, 1, &scanKey);
+               scan = systable_beginscan(relation, DatabaseNameIndexId, true,
+                                                                 SnapshotNow, 1, &scanKey);
 
-       tuple = systable_getnext(scan);
+               tuple = systable_getnext(scan);
 
-       gottuple = HeapTupleIsValid(tuple);
-       if (gottuple)
-       {
-               Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
-
-               /* oid of the database */
-               if (dbIdP)
-                       *dbIdP = HeapTupleGetOid(tuple);
-               /* sysid of the owner */
-               if (ownerIdP)
-                       *ownerIdP = dbform->datdba;
-               /* character encoding */
-               if (encodingP)
-                       *encodingP = dbform->encoding;
-               /* allowed as template? */
-               if (dbIsTemplateP)
-                       *dbIsTemplateP = dbform->datistemplate;
-               /* last system OID used in database */
-               if (dbLastSysOidP)
-                       *dbLastSysOidP = dbform->datlastsysoid;
-               /* limit of vacuumed XIDs */
-               if (dbVacuumXidP)
-                       *dbVacuumXidP = dbform->datvacuumxid;
-               /* limit of frozen XIDs */
-               if (dbFrozenXidP)
-                       *dbFrozenXidP = dbform->datfrozenxid;
-               /* database path (as registered in pg_database) */
-               if (dbpath)
+               if (!HeapTupleIsValid(tuple))
                {
-                       Datum           datum;
-                       bool            isnull;
-
-                       datum = heap_getattr(tuple,
-                                                                Anum_pg_database_datpath,
-                                                                RelationGetDescr(relation),
-                                                                &isnull);
-                       if (!isnull)
-                       {
-                               text       *pathtext = DatumGetTextP(datum);
-                               int                     pathlen = VARSIZE(pathtext) - VARHDRSZ;
+                       /* definitely no database of that name */
+                       systable_endscan(scan);
+                       break;
+               }
+
+               dbOid = HeapTupleGetOid(tuple);
+
+               systable_endscan(scan);
+
+               /*
+                * Now that we have a database OID, we can try to lock the DB.
+                */
+               if (lockmode != NoLock)
+                       LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
 
-                               Assert(pathlen >= 0 && pathlen < MAXPGPATH);
-                               strncpy(dbpath, VARDATA(pathtext), pathlen);
-                               *(dbpath + pathlen) = '\0';
+               /*
+                * And now, re-fetch the tuple by OID.  If it's still there and
+                * still the same name, we win; else, drop the lock and loop
+                * back to try again.
+                */
+               tuple = SearchSysCache(DATABASEOID,
+                                                          ObjectIdGetDatum(dbOid),
+                                                          0, 0, 0);
+               if (HeapTupleIsValid(tuple))
+               {
+                       Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
+
+                       if (strcmp(name, NameStr(dbform->datname)) == 0)
+                       {
+                               /* oid of the database */
+                               if (dbIdP)
+                                       *dbIdP = dbOid;
+                               /* oid of the owner */
+                               if (ownerIdP)
+                                       *ownerIdP = dbform->datdba;
+                               /* character encoding */
+                               if (encodingP)
+                                       *encodingP = dbform->encoding;
+                               /* allowed as template? */
+                               if (dbIsTemplateP)
+                                       *dbIsTemplateP = dbform->datistemplate;
+                               /* allowing connections? */
+                               if (dbAllowConnP)
+                                       *dbAllowConnP = dbform->datallowconn;
+                               /* last system OID used in database */
+                               if (dbLastSysOidP)
+                                       *dbLastSysOidP = dbform->datlastsysoid;
+                               /* limit of vacuumed XIDs */
+                               if (dbVacuumXidP)
+                                       *dbVacuumXidP = dbform->datvacuumxid;
+                               /* limit of min XIDs */
+                               if (dbMinXidP)
+                                       *dbMinXidP = dbform->datminxid;
+                               /* default tablespace for this database */
+                               if (dbTablespace)
+                                       *dbTablespace = dbform->dattablespace;
+                               ReleaseSysCache(tuple);
+                               result = true;
+                               break;
                        }
-                       else
-                               strcpy(dbpath, "");
+                       /* can only get here if it was just renamed */
+                       ReleaseSysCache(tuple);
                }
+
+               if (lockmode != NoLock)
+                       UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
        }
 
-       systable_endscan(scan);
        heap_close(relation, AccessShareLock);
 
-       return gottuple;
+       return result;
 }
 
+/* Check if current user has createdb privileges */
 static bool
 have_createdb_privilege(void)
 {
+       bool            result = false;
        HeapTuple       utup;
-       bool            retval;
-
-       utup = SearchSysCache(SHADOWSYSID,
-                                                 Int32GetDatum(GetUserId()),
-                                                 0, 0, 0);
-
-       if (!HeapTupleIsValid(utup))
-               retval = false;
-       else
-               retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
 
-       ReleaseSysCache(utup);
+       /* Superusers can always do everything */
+       if (superuser())
+               return true;
 
-       return retval;
+       utup = SearchSysCache(AUTHOID,
+                                                 ObjectIdGetDatum(GetUserId()),
+                                                 0, 0, 0);
+       if (HeapTupleIsValid(utup))
+       {
+               result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
+               ReleaseSysCache(utup);
+       }
+       return result;
 }
 
-
-static char *
-resolve_alt_dbpath(const char *dbpath, Oid dboid)
+/*
+ * Remove tablespace directories
+ *
+ * We don't know what tablespaces db_id is using, so iterate through all
+ * tablespaces removing <tablespace>/db_id
+ */
+static void
+remove_dbtablespaces(Oid db_id)
 {
-       const char *prefix;
-       char       *ret;
-       size_t          len;
-
-       if (dbpath == NULL || dbpath[0] == '\0')
-               return NULL;
+       Relation        rel;
+       HeapScanDesc scan;
+       HeapTuple       tuple;
 
-       if (first_path_separator(dbpath))
+       rel = heap_open(TableSpaceRelationId, AccessShareLock);
+       scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               if (!is_absolute_path(dbpath))
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                        errmsg("relative paths are not allowed as database locations")));
-#ifndef ALLOW_ABSOLUTE_DBPATHS
-               ereport(ERROR,
-                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-               errmsg("absolute paths are not allowed as database locations")));
-#endif
-               prefix = dbpath;
-       }
-       else
-       {
-               /* must be environment variable */
-               char       *var = getenv(dbpath);
-
-               if (!var)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
-                          errmsg("postmaster environment variable \"%s\" not found",
-                                         dbpath)));
-               if (!is_absolute_path(var))
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INVALID_NAME),
-                                        errmsg("postmaster environment variable \"%s\" must be absolute path",
-                                                       dbpath)));
-               prefix = var;
-       }
+               Oid                     dsttablespace = HeapTupleGetOid(tuple);
+               char       *dstpath;
+               struct stat st;
 
-       len = strlen(prefix) + 6 + sizeof(Oid) * 8 + 1;
-       if (len >= MAXPGPATH - 100)
-               ereport(ERROR,
-                               (errcode(ERRCODE_INVALID_NAME),
-                                errmsg("alternative path is too long")));
+               /* Don't mess with the global tablespace */
+               if (dsttablespace == GLOBALTABLESPACE_OID)
+                       continue;
 
-       ret = palloc(len);
-       snprintf(ret, len, "%s/base/%u", prefix, dboid);
+               dstpath = GetDatabasePath(db_id, dsttablespace);
 
-       return ret;
-}
+               if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
+               {
+                       /* Assume we can ignore it */
+                       pfree(dstpath);
+                       continue;
+               }
 
+               if (!rmtree(dstpath, true))
+                       ereport(WARNING,
+                                       (errmsg("could not remove database directory \"%s\"",
+                                                       dstpath)));
 
-static bool
-remove_dbdirs(const char *nominal_loc, const char *alt_loc)
-{
-       const char *target_dir;
-       char            buf[MAXPGPATH + 100];
-       bool            success = true;
+               /* Record the filesystem change in XLOG */
+               {
+                       xl_dbase_drop_rec xlrec;
+                       XLogRecData rdata[1];
 
-       target_dir = alt_loc ? alt_loc : nominal_loc;
+                       xlrec.db_id = db_id;
+                       xlrec.tablespace_id = dsttablespace;
 
-       /*
-        * Close virtual file descriptors so the kernel has more available for
-        * the system() call below.
-        */
-       closeAllVfds();
+                       rdata[0].data = (char *) &xlrec;
+                       rdata[0].len = sizeof(xl_dbase_drop_rec);
+                       rdata[0].buffer = InvalidBuffer;
+                       rdata[0].next = NULL;
 
-       if (alt_loc)
-       {
-               /* remove symlink */
-               if (unlink(nominal_loc) != 0)
-               {
-                       ereport(WARNING,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not remove file \"%s\": %m", nominal_loc)));
-                       success = false;
+                       (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
                }
-       }
 
-#ifndef WIN32
-       snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir);
-#else
-       snprintf(buf, sizeof(buf), "rmdir /s /q \"%s\"", target_dir);
-#endif
-
-       if (system(buf) != 0)
-       {
-               ereport(WARNING,
-                               (errmsg("could not remove database directory \"%s\"",
-                                               target_dir),
-                                errdetail("Failing system command was: %s", buf),
-                                errhint("Look in the postmaster's stderr log for more information.")));
-               success = false;
+               pfree(dstpath);
        }
 
-       return success;
+       heap_endscan(scan);
+       heap_close(rel, AccessShareLock);
 }
 
 
@@ -971,8 +1257,6 @@ remove_dbdirs(const char *nominal_loc, const char *alt_loc)
  * get_database_oid - given a database name, look up the OID
  *
  * Returns InvalidOid if database name not found.
- *
- * This is not actually used in this file, but is exported for use elsewhere.
  */
 Oid
 get_database_oid(const char *dbname)
@@ -983,12 +1267,17 @@ get_database_oid(const char *dbname)
        HeapTuple       dbtuple;
        Oid                     oid;
 
-       /* There's no syscache for pg_database, so must look the hard way */
-       pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
-       ScanKeyEntryInitialize(&entry[0], 0x0,
-                                                  Anum_pg_database_datname, F_NAMEEQ,
-                                                  CStringGetDatum(dbname));
-       scan = systable_beginscan(pg_database, DatabaseNameIndex, true, SnapshotNow, 1, entry);
+       /*
+        * There's no syscache for pg_database indexed by name,
+        * so we must look the hard way.
+        */
+       pg_database = heap_open(DatabaseRelationId, AccessShareLock);
+       ScanKeyInit(&entry[0],
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               CStringGetDatum(dbname));
+       scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
+                                                         SnapshotNow, 1, entry);
 
        dbtuple = systable_getnext(scan);
 
@@ -1008,36 +1297,119 @@ get_database_oid(const char *dbname)
 /*
  * get_database_name - given a database OID, look up the name
  *
- * Returns InvalidOid if database name not found.
- *
- * This is not actually used in this file, but is exported for use elsewhere.
+ * Returns a palloc'd string, or NULL if no such database.
  */
 char *
 get_database_name(Oid dbid)
 {
-       Relation        pg_database;
-       ScanKeyData entry[1];
-       SysScanDesc scan;
        HeapTuple       dbtuple;
        char       *result;
 
-       /* There's no syscache for pg_database, so must look the hard way */
-       pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
-       ScanKeyEntryInitialize(&entry[0], 0x0,
-                                                  ObjectIdAttributeNumber, F_OIDEQ,
-                                                  ObjectIdGetDatum(dbid));
-       scan = systable_beginscan(pg_database, DatabaseOidIndex, true, SnapshotNow, 1, entry);
-
-       dbtuple = systable_getnext(scan);
-
-       /* We assume that there can be at most one matching tuple */
+       dbtuple = SearchSysCache(DATABASEOID,
+                                                        ObjectIdGetDatum(dbid),
+                                                        0, 0, 0);
        if (HeapTupleIsValid(dbtuple))
+       {
                result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
+               ReleaseSysCache(dbtuple);
+       }
        else
                result = NULL;
 
-       systable_endscan(scan);
-       heap_close(pg_database, AccessShareLock);
-
        return result;
 }
+
+/*
+ * DATABASE resource manager's routines
+ */
+void
+dbase_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+       uint8           info = record->xl_info & ~XLR_INFO_MASK;
+
+       if (info == XLOG_DBASE_CREATE)
+       {
+               xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
+               char       *src_path;
+               char       *dst_path;
+               struct stat st;
+
+               src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
+               dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+
+               /*
+                * Our theory for replaying a CREATE is to forcibly drop the target
+                * subdirectory if present, then re-copy the source data. This may be
+                * more work than needed, but it is simple to implement.
+                */
+               if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
+               {
+                       if (!rmtree(dst_path, true))
+                               ereport(WARNING,
+                                               (errmsg("could not remove database directory \"%s\"",
+                                                               dst_path)));
+               }
+
+               /*
+                * Force dirty buffers out to disk, to ensure source database is
+                * up-to-date for the copy.  (We really only need to flush buffers for
+                * the source database, but bufmgr.c provides no API for that.)
+                */
+               BufferSync();
+
+               /*
+                * Copy this subdirectory to the new location
+                *
+                * We don't need to copy subdirectories
+                */
+               copydir(src_path, dst_path, false);
+       }
+       else if (info == XLOG_DBASE_DROP)
+       {
+               xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
+               char       *dst_path;
+
+               dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+
+               /* Drop pages for this database that are in the shared buffer cache */
+               DropDatabaseBuffers(xlrec->db_id);
+
+               /* Also, clean out any entries in the shared free space map */
+               FreeSpaceMapForgetDatabase(xlrec->db_id);
+
+               /* Clean out the xlog relcache too */
+               XLogDropDatabase(xlrec->db_id);
+
+               /* And remove the physical files */
+               if (!rmtree(dst_path, true))
+                       ereport(WARNING,
+                                       (errmsg("could not remove database directory \"%s\"",
+                                                       dst_path)));
+       }
+       else
+               elog(PANIC, "dbase_redo: unknown op code %u", info);
+}
+
+void
+dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
+{
+       uint8           info = xl_info & ~XLR_INFO_MASK;
+
+       if (info == XLOG_DBASE_CREATE)
+       {
+               xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
+
+               appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
+                               xlrec->src_db_id, xlrec->src_tablespace_id,
+                               xlrec->db_id, xlrec->tablespace_id);
+       }
+       else if (info == XLOG_DBASE_DROP)
+       {
+               xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
+
+               appendStringInfo(buf, "drop db: dir %u/%u",
+                               xlrec->db_id, xlrec->tablespace_id);
+       }
+       else
+               appendStringInfo(buf, "UNKNOWN");
+}