]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/dbcommands.c
On Windows, force a checkpoint just before dropping a database's physical
[postgresql] / src / backend / commands / dbcommands.c
index c6bbb37186098cb49da25b13e1f9d32881e1edf6..a74ad9b2f0720b655c7b976c4efd36fc7a874a78 100644 (file)
@@ -4,34 +4,39 @@
  *             Database management commands (create/drop database).
  *
  *
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.96 2002/07/12 18:43:15 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.146 2004/10/28 00:39:58 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include <errno.h>
 #include <fcntl.h>
 #include <unistd.h>
 #include <sys/stat.h>
-#include <sys/types.h>
 
+#include "access/genam.h"
 #include "access/heapam.h"
 #include "catalog/catname.h"
 #include "catalog/catalog.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_shadow.h"
+#include "catalog/pg_tablespace.h"
 #include "catalog/indexing.h"
 #include "commands/comment.h"
 #include "commands/dbcommands.h"
+#include "commands/tablespace.h"
+#include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "postmaster/bgwriter.h"
+#include "storage/fd.h"
 #include "storage/freespace.h"
 #include "storage/sinval.h"
+#include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
-#ifdef MULTIBYTE
-#include "mb/pg_wchar.h"               /* encoding check */
-#endif
-
 
 /* non-export function prototypes */
 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
                        int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
                        TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
-                       char *dbpath);
+                       Oid *dbTablespace);
 static bool have_createdb_privilege(void);
-static char *resolve_alt_dbpath(const char *dbpath, Oid dboid);
-static bool remove_dbdirs(const char *real_loc, const char *altloc);
+static void remove_dbtablespaces(Oid db_id);
+
 
 /*
  * CREATE DATABASE
  */
-
 void
 createdb(const CreatedbStmt *stmt)
 {
-       char       *nominal_loc;
-       char       *alt_loc;
-       char       *target_dir;
-       char            src_loc[MAXPGPATH];
-       char            buf[2 * MAXPGPATH + 100];
+       HeapScanDesc scan;
+       Relation        rel;
        Oid                     src_dboid;
-       int4            src_owner;
+       AclId           src_owner;
        int                     src_encoding;
        bool            src_istemplate;
        Oid                     src_lastsysoid;
        TransactionId src_vacuumxid;
        TransactionId src_frozenxid;
-       char            src_dbpath[MAXPGPATH];
+       Oid                     src_deftablespace;
+       Oid                     dst_deftablespace;
        Relation        pg_database_rel;
        HeapTuple       tuple;
        TupleDesc       pg_database_dsc;
        Datum           new_record[Natts_pg_database];
        char            new_record_nulls[Natts_pg_database];
        Oid                     dboid;
-       int32           datdba;
-       List       *option;
+       AclId           datdba;
+       ListCell   *option;
+       DefElem    *dtablespacename = NULL;
        DefElem    *downer = NULL;
-       DefElem    *dpath = NULL;
-       DefElem    *dtemplate = NULL;
-       DefElem    *dencoding = NULL;
+       DefElem    *dtemplate = NULL;
+       DefElem    *dencoding = NULL;
        char       *dbname = stmt->dbname;
        char       *dbowner = NULL;
-       char       *dbpath = NULL;
        char       *dbtemplate = NULL;
-       int                 encoding = -1;
+       int                     encoding = -1;
+
+#ifndef WIN32
+       char            buf[2 * MAXPGPATH + 100];
+#endif
+
+       /* don't call this in a transaction block */
+       PreventTransactionChain((void *) stmt, "CREATE DATABASE");
 
        /* Extract options from the statement node tree */
        foreach(option, stmt->options)
        {
                DefElem    *defel = (DefElem *) lfirst(option);
 
-               if (strcmp(defel->defname, "owner") == 0)
+               if (strcmp(defel->defname, "tablespace") == 0)
                {
-                       if (downer)
-                               elog(ERROR, "CREATE DATABASE: conflicting options");
-                       downer = defel;
+                       if (dtablespacename)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       dtablespacename = defel;
                }
-               else if (strcmp(defel->defname, "location") == 0)
+               else if (strcmp(defel->defname, "owner") == 0)
                {
-                       if (dpath)
-                               elog(ERROR, "CREATE DATABASE: conflicting options");
-                       dpath = defel;
+                       if (downer)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       downer = defel;
                }
                else if (strcmp(defel->defname, "template") == 0)
                {
                        if (dtemplate)
-                               elog(ERROR, "CREATE DATABASE: conflicting options");
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
                        dtemplate = defel;
                }
                else if (strcmp(defel->defname, "encoding") == 0)
                {
                        if (dencoding)
-                               elog(ERROR, "CREATE DATABASE: conflicting options");
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
                        dencoding = defel;
                }
+               else if (strcmp(defel->defname, "location") == 0)
+               {
+                       ereport(WARNING,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("LOCATION is not supported anymore"),
+                                        errhint("Consider using tablespaces instead.")));
+               }
                else
-                       elog(ERROR, "CREATE DATABASE: option \"%s\" not recognized",
+                       elog(ERROR, "option \"%s\" not recognized",
                                 defel->defname);
        }
 
-       if (downer)
+       if (downer && downer->arg)
                dbowner = strVal(downer->arg);
-       if (dpath)
-               dbpath = strVal(dpath->arg);
-       if (dtemplate)
+       if (dtemplate && dtemplate->arg)
                dbtemplate = strVal(dtemplate->arg);
-       if (dencoding)
-               encoding = intVal(dencoding->arg);
+       if (dencoding && dencoding->arg)
+       {
+               const char *encoding_name;
+
+               if (IsA(dencoding->arg, Integer))
+               {
+                       encoding = intVal(dencoding->arg);
+                       encoding_name = pg_encoding_to_char(encoding);
+                       if (strcmp(encoding_name, "") == 0 ||
+                               pg_valid_server_encoding(encoding_name) < 0)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                                errmsg("%d is not a valid encoding code",
+                                                               encoding)));
+               }
+               else if (IsA(dencoding->arg, String))
+               {
+                       encoding_name = strVal(dencoding->arg);
+                       if (pg_valid_server_encoding(encoding_name) < 0)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                                errmsg("%s is not a valid encoding name",
+                                                               encoding_name)));
+                       encoding = pg_char_to_encoding(encoding_name);
+               }
+               else
+                       elog(ERROR, "unrecognized node type: %d",
+                                nodeTag(dencoding->arg));
+       }
 
        /* obtain sysid of proposed owner */
        if (dbowner)
-               datdba = get_usesysid(dbowner); /* will elog if no such user */
+               datdba = get_usesysid(dbowner); /* will ereport if no such user */
        else
                datdba = GetUserId();
 
-       if (datdba == (int32) GetUserId())
+       if (datdba == GetUserId())
        {
                /* creating database for self: can be superuser or createdb */
                if (!superuser() && !have_createdb_privilege())
-                       elog(ERROR, "CREATE DATABASE: permission denied");
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("permission denied to create database")));
        }
        else
        {
                /* creating database for someone else: must be superuser */
                /* note that the someone else need not have any permissions */
                if (!superuser())
-                       elog(ERROR, "CREATE DATABASE: permission denied");
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("must be superuser to create database for another user")));
        }
 
-       /* don't call this in a transaction block */
-       if (IsTransactionBlock())
-               elog(ERROR, "CREATE DATABASE: may not be called in a transaction block");
-
        /*
         * Check for db name conflict.  There is a race condition here, since
         * another backend could create the same DB name before we commit.
@@ -167,7 +211,9 @@ createdb(const CreatedbStmt *stmt)
         * after we grab the exclusive lock.
         */
        if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
-               elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_DATABASE),
+                                errmsg("database \"%s\" already exists", dbname)));
 
        /*
         * Lookup database (template) to be cloned.
@@ -177,10 +223,10 @@ createdb(const CreatedbStmt *stmt)
 
        if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
                                         &src_istemplate, &src_lastsysoid,
-                                        &src_vacuumxid, &src_frozenxid,
-                                        src_dbpath))
-               elog(ERROR, "CREATE DATABASE: template \"%s\" does not exist",
-                        dbtemplate);
+                                        &src_vacuumxid, &src_frozenxid, &src_deftablespace))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                errmsg("template database \"%s\" does not exist", dbtemplate)));
 
        /*
         * Permission check: to copy a DB that's not marked datistemplate, you
@@ -188,19 +234,13 @@ createdb(const CreatedbStmt *stmt)
         */
        if (!src_istemplate)
        {
-               if (!superuser() && GetUserId() != src_owner )
-                       elog(ERROR, "CREATE DATABASE: permission to copy \"%s\" denied",
-                                dbtemplate);
+               if (!superuser() && GetUserId() != src_owner)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("permission denied to copy database \"%s\"",
+                                                       dbtemplate)));
        }
 
-       /*
-        * Determine physical path of source database
-        */
-       alt_loc = resolve_alt_dbpath(src_dbpath, src_dboid);
-       if (!alt_loc)
-               alt_loc = GetDatabasePath(src_dboid);
-       strcpy(src_loc, alt_loc);
-
        /*
         * The source DB can't have any active backends, except this one
         * (exception is to allow CREATE DB while connected to template1).
@@ -208,19 +248,78 @@ createdb(const CreatedbStmt *stmt)
         * bulletproof, since someone might connect while we are copying...
         */
        if (DatabaseHasActiveBackends(src_dboid, true))
-               elog(ERROR, "CREATE DATABASE: source database \"%s\" is being accessed by other users", dbtemplate);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+               errmsg("source database \"%s\" is being accessed by other users",
+                          dbtemplate)));
 
        /* If encoding is defaulted, use source's encoding */
        if (encoding < 0)
                encoding = src_encoding;
 
-#ifdef MULTIBYTE
        /* Some encodings are client only */
        if (!PG_VALID_BE_ENCODING(encoding))
-               elog(ERROR, "CREATE DATABASE: invalid backend encoding");
-#else
-       Assert(encoding == 0);          /* zero is PG_SQL_ASCII */
-#endif
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("invalid server encoding %d", encoding)));
+
+       /* Resolve default tablespace for new database */
+       if (dtablespacename && dtablespacename->arg)
+       {
+               char       *tablespacename;
+               AclResult       aclresult;
+
+               tablespacename = strVal(dtablespacename->arg);
+               dst_deftablespace = get_tablespace_oid(tablespacename);
+               if (!OidIsValid(dst_deftablespace))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                        errmsg("tablespace \"%s\" does not exist",
+                                                       tablespacename)));
+               /* check permissions */
+               aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
+                                                                                  ACL_CREATE);
+               if (aclresult != ACLCHECK_OK)
+                       aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
+                                                  tablespacename);
+
+               /*
+                * If we are trying to change the default tablespace of the template,
+                * we require that the template not have any files in the new default
+                * tablespace.  This is necessary because otherwise the copied
+                * database would contain pg_class rows that refer to its default
+                * tablespace both explicitly (by OID) and implicitly (as zero), which
+                * would cause problems.  For example another CREATE DATABASE using
+                * the copied database as template, and trying to change its default
+                * tablespace again, would yield outright incorrect results (it would
+                * improperly move tables to the new default tablespace that should
+                * stay in the same tablespace).
+                */
+               if (dst_deftablespace != src_deftablespace)
+               {
+                       char       *srcpath;
+                       struct stat st;
+
+                       srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
+
+                       if (stat(srcpath, &st) == 0 &&
+                               S_ISDIR(st.st_mode) &&
+                               !directory_is_empty(srcpath))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("cannot assign new default tablespace \"%s\"",
+                                                               tablespacename),
+                                                errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
+                                                                  dbtemplate)));
+                       pfree(srcpath);
+               }
+       }
+       else
+       {
+               /* Use template database's default tablespace */
+               dst_deftablespace = src_deftablespace;
+               /* Note there is no additional permission check in this path */
+       }
 
        /*
         * Preassign OID for pg_database tuple, so that we can compute db
@@ -228,77 +327,120 @@ createdb(const CreatedbStmt *stmt)
         */
        dboid = newoid();
 
-       /*
-        * Compute nominal location (where we will try to access the
-        * database), and resolve alternate physical location if one is
-        * specified.
-        *
-        * If an alternate location is specified but is the same as the
-        * normal path, just drop the alternate-location spec (this seems
-        * friendlier than erroring out).  We must test this case to avoid
-        * creating a circular symlink below.
-        */
-       nominal_loc = GetDatabasePath(dboid);
-       alt_loc = resolve_alt_dbpath(dbpath, dboid);
-
-       if (alt_loc && strcmp(alt_loc, nominal_loc) == 0)
-       {
-               alt_loc = NULL;
-               dbpath = NULL;
-       }
-
-       if (strchr(nominal_loc, '\''))
-               elog(ERROR, "database path may not contain single quotes");
-       if (alt_loc && strchr(alt_loc, '\''))
-               elog(ERROR, "database path may not contain single quotes");
-       if (strchr(src_loc, '\''))
-               elog(ERROR, "database path may not contain single quotes");
-       /* ... otherwise we'd be open to shell exploits below */
-
        /*
         * Force dirty buffers out to disk, to ensure source database is
         * up-to-date for the copy.  (We really only need to flush buffers for
         * the source database...)
         */
-       BufferSync();
+       BufferSync(-1, -1);
 
        /*
         * Close virtual file descriptors so the kernel has more available for
-        * the mkdir() and system() calls below.
+        * the system() calls below.
         */
        closeAllVfds();
 
        /*
-        * Check we can create the target directory --- but then remove it
-        * because we rely on cp(1) to create it for real.
+        * Iterate through all tablespaces of the template database, and copy
+        * each one to the new database.
         */
-       target_dir = alt_loc ? alt_loc : nominal_loc;
+       rel = heap_openr(TableSpaceRelationName, AccessShareLock);
+       scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+               Oid                     srctablespace = HeapTupleGetOid(tuple);
+               Oid                     dsttablespace;
+               char       *srcpath;
+               char       *dstpath;
+               struct stat st;
 
-       if (mkdir(target_dir, S_IRWXU) != 0)
-               elog(ERROR, "CREATE DATABASE: unable to create database directory '%s': %m",
-                        target_dir);
-       if (rmdir(target_dir) != 0)
-               elog(ERROR, "CREATE DATABASE: unable to remove temp directory '%s': %m",
-                        target_dir);
+               /* No need to copy global tablespace */
+               if (srctablespace == GLOBALTABLESPACE_OID)
+                       continue;
 
-       /* Make the symlink, if needed */
-       if (alt_loc)
-       {
-               if (symlink(alt_loc, nominal_loc) != 0)
-                       elog(ERROR, "CREATE DATABASE: could not link '%s' to '%s': %m",
-                                nominal_loc, alt_loc);
-       }
+               srcpath = GetDatabasePath(src_dboid, srctablespace);
 
-       /* Copy the template database to the new location */
-       snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir);
+               if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+                       directory_is_empty(srcpath))
+               {
+                       /* Assume we can ignore it */
+                       pfree(srcpath);
+                       continue;
+               }
 
-       if (system(buf) != 0)
-       {
-               if (remove_dbdirs(nominal_loc, alt_loc))
-                       elog(ERROR, "CREATE DATABASE: could not initialize database directory");
+               if (srctablespace == src_deftablespace)
+                       dsttablespace = dst_deftablespace;
                else
-                       elog(ERROR, "CREATE DATABASE: could not initialize database directory; delete failed as well");
+                       dsttablespace = srctablespace;
+
+               dstpath = GetDatabasePath(dboid, dsttablespace);
+
+               if (stat(dstpath, &st) == 0 || errno != ENOENT)
+               {
+                       remove_dbtablespaces(dboid);
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory"),
+                                        errdetail("Directory \"%s\" already exists.",
+                                                          dstpath)));
+               }
+
+#ifndef WIN32
+
+               /*
+                * Copy this subdirectory to the new location
+                *
+                * XXX use of cp really makes this code pretty grotty, particularly
+                * with respect to lack of ability to report errors well.  Someday
+                * rewrite to do it for ourselves.
+                */
+
+               /* We might need to use cp -R one day for portability */
+               snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
+                                srcpath, dstpath);
+               if (system(buf) != 0)
+               {
+                       remove_dbtablespaces(dboid);
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory"),
+                                        errdetail("Failing system command was: %s", buf),
+                                        errhint("Look in the postmaster's stderr log for more information.")));
+               }
+#else                                                  /* WIN32 */
+               if (copydir(srcpath, dstpath) != 0)
+               {
+                       /* copydir should already have given details of its troubles */
+                       remove_dbtablespaces(dboid);
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory")));
+               }
+#endif   /* WIN32 */
+
+               /* Record the filesystem change in XLOG */
+               {
+                       xl_dbase_create_rec xlrec;
+                       XLogRecData rdata[3];
+
+                       xlrec.db_id = dboid;
+                       rdata[0].buffer = InvalidBuffer;
+                       rdata[0].data = (char *) &xlrec;
+                       rdata[0].len = offsetof(xl_dbase_create_rec, src_path);
+                       rdata[0].next = &(rdata[1]);
+
+                       rdata[1].buffer = InvalidBuffer;
+                       rdata[1].data = (char *) srcpath;
+                       rdata[1].len = strlen(srcpath) + 1;
+                       rdata[1].next = &(rdata[2]);
+
+                       rdata[2].buffer = InvalidBuffer;
+                       rdata[2].data = (char *) dstpath;
+                       rdata[2].len = strlen(dstpath) + 1;
+                       rdata[2].next = NULL;
+
+                       (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
+               }
        }
+       heap_endscan(scan);
+       heap_close(rel, AccessShareLock);
 
        /*
         * Now OK to grab exclusive lock on pg_database.
@@ -310,8 +452,10 @@ createdb(const CreatedbStmt *stmt)
        {
                /* Don't hold lock while doing recursive remove */
                heap_close(pg_database_rel, AccessExclusiveLock);
-               remove_dbdirs(nominal_loc, alt_loc);
-               elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
+               remove_dbtablespaces(dboid);
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_DATABASE),
+                                errmsg("database \"%s\" already exists", dbname)));
        }
 
        /*
@@ -332,33 +476,26 @@ createdb(const CreatedbStmt *stmt)
        new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
        new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
        new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
-       /* do not set datpath to null, GetRawDatabaseInfo won't cope */
-       new_record[Anum_pg_database_datpath - 1] =
-               DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : ""));
+       new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
 
+       /*
+        * We deliberately set datconfig and datacl to defaults (NULL), rather
+        * than copying them from the template database.  Copying datacl would
+        * be a bad idea when the owner is not the same as the template's
+        * owner. It's more debatable whether datconfig should be copied.
+        */
        new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
        new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
 
        tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
 
-       tuple->t_data->t_oid = dboid;           /* override heap_insert's OID
+       HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
                                                                                 * selection */
 
        simple_heap_insert(pg_database_rel, tuple);
 
-       /*
-        * Update indexes
-        */
-       if (RelationGetForm(pg_database_rel)->relhasindex)
-       {
-               Relation        idescs[Num_pg_database_indices];
-
-               CatalogOpenIndices(Num_pg_database_indices,
-                                                  Name_pg_database_indices, idescs);
-               CatalogIndexInsert(idescs, Num_pg_database_indices, pg_database_rel,
-                                                  tuple);
-               CatalogCloseIndices(Num_pg_database_indices, idescs);
-       }
+       /* Update indexes */
+       CatalogUpdateIndexes(pg_database_rel, tuple);
 
        /* Close pg_database, but keep lock till commit */
        heap_close(pg_database_rel, NoLock);
@@ -368,7 +505,7 @@ createdb(const CreatedbStmt *stmt)
         * will see the new database in pg_database right away.  (They'll see
         * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
         */
-       BufferSync();
+       BufferSync(-1, -1);
 }
 
 
@@ -381,21 +518,19 @@ dropdb(const char *dbname)
        int4            db_owner;
        bool            db_istemplate;
        Oid                     db_id;
-       char       *alt_loc;
-       char       *nominal_loc;
-       char            dbpath[MAXPGPATH];
        Relation        pgdbrel;
-       HeapScanDesc pgdbscan;
+       SysScanDesc pgdbscan;
        ScanKeyData key;
        HeapTuple       tup;
 
-       AssertArg(dbname);
+       PreventTransactionChain((void *) dbname, "DROP DATABASE");
 
-       if (strcmp(dbname, DatabaseName) == 0)
-               elog(ERROR, "DROP DATABASE: cannot be executed on the currently open database");
+       AssertArg(dbname);
 
-       if (IsTransactionBlock())
-               elog(ERROR, "DROP DATABASE: may not be called in a transaction block");
+       if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                                errmsg("cannot drop the currently open database")));
 
        /*
         * Obtain exclusive lock on pg_database.  We need this to ensure that
@@ -409,11 +544,14 @@ dropdb(const char *dbname)
        pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
 
        if (!get_db_info(dbname, &db_id, &db_owner, NULL,
-                                        &db_istemplate, NULL, NULL, NULL, dbpath))
-               elog(ERROR, "DROP DATABASE: database \"%s\" does not exist", dbname);
+                                        &db_istemplate, NULL, NULL, NULL, NULL))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", dbname)));
 
        if (GetUserId() != db_owner && !superuser())
-               elog(ERROR, "DROP DATABASE: permission denied");
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          dbname);
 
        /*
         * Disallow dropping a DB that is marked istemplate.  This is just to
@@ -421,40 +559,45 @@ dropdb(const char *dbname)
         * they can do so if they're really determined ...
         */
        if (db_istemplate)
-               elog(ERROR, "DROP DATABASE: database is marked as a template");
-
-       nominal_loc = GetDatabasePath(db_id);
-       alt_loc = resolve_alt_dbpath(dbpath, db_id);
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("cannot drop a template database")));
 
        /*
         * Check for active backends in the target database.
         */
        if (DatabaseHasActiveBackends(db_id, false))
-               elog(ERROR, "DROP DATABASE: database \"%s\" is being accessed by other users", dbname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                          errmsg("database \"%s\" is being accessed by other users",
+                                         dbname)));
 
        /*
         * Find the database's tuple by OID (should be unique).
         */
-       ScanKeyEntryInitialize(&key, 0, ObjectIdAttributeNumber,
-                                                  F_OIDEQ, ObjectIdGetDatum(db_id));
+       ScanKeyInit(&key,
+                               ObjectIdAttributeNumber,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(db_id));
 
-       pgdbscan = heap_beginscan(pgdbrel, SnapshotNow, 1, &key);
+       pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
+                                                                 SnapshotNow, 1, &key);
 
-       tup = heap_getnext(pgdbscan, ForwardScanDirection);
+       tup = systable_getnext(pgdbscan);
        if (!HeapTupleIsValid(tup))
        {
                /*
                 * This error should never come up since the existence of the
                 * database is checked earlier
                 */
-               elog(ERROR, "DROP DATABASE: Database \"%s\" doesn't exist despite earlier reports to the contrary",
+               elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
                         dbname);
        }
 
        /* Remove the database's tuple from pg_database */
        simple_heap_delete(pgdbrel, &tup->t_self);
 
-       heap_endscan(pgdbscan);
+       systable_endscan(pgdbscan);
 
        /*
         * Delete any comments associated with the database
@@ -483,9 +626,17 @@ dropdb(const char *dbname)
        FreeSpaceMapForgetDatabase(db_id);
 
        /*
-        * Remove the database's subdirectory and everything in it.
+        * On Windows, force a checkpoint so that the bgwriter doesn't hold any
+        * open files, which would cause rmdir() to fail.
+        */
+#ifdef WIN32
+       RequestCheckpoint(true);
+#endif
+
+       /*
+        * Remove all tablespace subdirs belonging to the database.
         */
-       remove_dbdirs(nominal_loc, alt_loc);
+       remove_dbtablespaces(db_id);
 
        /*
         * Force dirty buffers out to disk, so that newly-connecting backends
@@ -493,10 +644,106 @@ dropdb(const char *dbname)
         * (They'll see an uncommitted deletion, but they don't care; see
         * GetRawDatabaseInfo.)
         */
-       BufferSync();
+       BufferSync(-1, -1);
 }
 
 
+/*
+ * Rename database
+ */
+void
+RenameDatabase(const char *oldname, const char *newname)
+{
+       HeapTuple       tup,
+                               newtup;
+       Relation        rel;
+       SysScanDesc scan,
+                               scan2;
+       ScanKeyData key,
+                               key2;
+
+       /*
+        * Obtain AccessExclusiveLock so that no new session gets started
+        * while the rename is in progress.
+        */
+       rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
+
+       ScanKeyInit(&key,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(oldname));
+       scan = systable_beginscan(rel, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, &key);
+
+       tup = systable_getnext(scan);
+       if (!HeapTupleIsValid(tup))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", oldname)));
+
+       /*
+        * XXX Client applications probably store the current database
+        * somewhere, so renaming it could cause confusion.  On the other
+        * hand, there may not be an actual problem besides a little
+        * confusion, so think about this and decide.
+        */
+       if (HeapTupleGetOid(tup) == MyDatabaseId)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("current database may not be renamed")));
+
+       /*
+        * Make sure the database does not have active sessions.  Might not be
+        * necessary, but it's consistent with other database operations.
+        */
+       if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                          errmsg("database \"%s\" is being accessed by other users",
+                                         oldname)));
+
+       /* make sure the new name doesn't exist */
+       ScanKeyInit(&key2,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(newname));
+       scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
+                                                          SnapshotNow, 1, &key2);
+       if (HeapTupleIsValid(systable_getnext(scan2)))
+               ereport(ERROR,
+                               (errcode(ERRCODE_DUPLICATE_DATABASE),
+                                errmsg("database \"%s\" already exists", newname)));
+       systable_endscan(scan2);
+
+       /* must be owner */
+       if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          oldname);
+
+       /* must have createdb */
+       if (!have_createdb_privilege())
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                errmsg("permission denied to rename database")));
+
+       /* rename */
+       newtup = heap_copytuple(tup);
+       namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
+       simple_heap_update(rel, &newtup->t_self, newtup);
+       CatalogUpdateIndexes(rel, newtup);
+
+       systable_endscan(scan);
+       heap_close(rel, NoLock);
+
+       /*
+        * Force dirty buffers out to disk, so that newly-connecting backends
+        * will see the renamed database in pg_database right away.  (They'll
+        * see an uncommitted tuple, but they don't care; see
+        * GetRawDatabaseInfo.)
+        */
+       BufferSync(-1, -1);
+}
+
 
 /*
  * ALTER DATABASE name SET ...
@@ -508,8 +755,8 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
        HeapTuple       tuple,
                                newtuple;
        Relation        rel;
-       ScanKeyData     scankey;
-       HeapScanDesc scan;
+       ScanKeyData scankey;
+       SysScanDesc scan;
        Datum           repl_val[Natts_pg_database];
        char            repl_null[Natts_pg_database];
        char            repl_repl[Natts_pg_database];
@@ -517,70 +764,150 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
        valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
 
        rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
-       ScanKeyEntryInitialize(&scankey, 0, Anum_pg_database_datname,
-                                                  F_NAMEEQ, NameGetDatum(stmt->dbname));
-       scan = heap_beginscan(rel, SnapshotNow, 1, &scankey);
-       tuple = heap_getnext(scan, ForwardScanDirection);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(stmt->dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, &scankey);
+       tuple = systable_getnext(scan);
        if (!HeapTupleIsValid(tuple))
-               elog(ERROR, "database \"%s\" does not exist", stmt->dbname);
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", stmt->dbname)));
 
        if (!(superuser()
-                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
-               elog(ERROR, "permission denied");
+               || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          stmt->dbname);
 
        MemSet(repl_repl, ' ', sizeof(repl_repl));
-       repl_repl[Anum_pg_database_datconfig-1] = 'r';
+       repl_repl[Anum_pg_database_datconfig - 1] = 'r';
 
-       if (strcmp(stmt->variable, "all")==0 && valuestr == NULL)
+       if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
        {
                /* RESET ALL */
-               repl_null[Anum_pg_database_datconfig-1] = 'n';
-               repl_val[Anum_pg_database_datconfig-1] = (Datum) 0;
+               repl_null[Anum_pg_database_datconfig - 1] = 'n';
+               repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
        }
        else
        {
-               Datum datum;
-               bool isnull;
-               ArrayType *a;
+               Datum           datum;
+               bool            isnull;
+               ArrayType  *a;
 
-               repl_null[Anum_pg_database_datconfig-1] = ' ';
+               repl_null[Anum_pg_database_datconfig - 1] = ' ';
 
                datum = heap_getattr(tuple, Anum_pg_database_datconfig,
                                                         RelationGetDescr(rel), &isnull);
 
-               a = isnull ? ((ArrayType *) NULL) : DatumGetArrayTypeP(datum);
+               a = isnull ? NULL : DatumGetArrayTypeP(datum);
 
                if (valuestr)
                        a = GUCArrayAdd(a, stmt->variable, valuestr);
                else
                        a = GUCArrayDelete(a, stmt->variable);
 
-               repl_val[Anum_pg_database_datconfig-1] = PointerGetDatum(a);
+               if (a)
+                       repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
+               else
+                       repl_null[Anum_pg_database_datconfig - 1] = 'n';
        }
 
        newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
        simple_heap_update(rel, &tuple->t_self, newtuple);
 
+       /* Update indexes */
+       CatalogUpdateIndexes(rel, newtuple);
+
+       systable_endscan(scan);
+       heap_close(rel, NoLock);
+}
+
+
+/*
+ * ALTER DATABASE name OWNER TO newowner
+ */
+void
+AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
+{
+       HeapTuple       tuple;
+       Relation        rel;
+       ScanKeyData scankey;
+       SysScanDesc scan;
+       Form_pg_database datForm;
+
+       rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, &scankey);
+       tuple = systable_getnext(scan);
+       if (!HeapTupleIsValid(tuple))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", dbname)));
+
+       datForm = (Form_pg_database) GETSTRUCT(tuple);
+
        /*
-        * Update indexes
+        * If the new owner is the same as the existing owner, consider the
+        * command to have succeeded.  This is to be consistent with other
+        * objects.
         */
-       if (RelationGetForm(rel)->relhasindex)
+       if (datForm->datdba != newOwnerSysId)
        {
-               Relation        idescs[Num_pg_database_indices];
+               Datum           repl_val[Natts_pg_database];
+               char            repl_null[Natts_pg_database];
+               char            repl_repl[Natts_pg_database];
+               Acl                *newAcl;
+               Datum           aclDatum;
+               bool            isNull;
+               HeapTuple       newtuple;
+
+               /* changing owner's database for someone else: must be superuser */
+               /* note that the someone else need not have any permissions */
+               if (!superuser())
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                        errmsg("must be superuser to change owner")));
+
+               memset(repl_null, ' ', sizeof(repl_null));
+               memset(repl_repl, ' ', sizeof(repl_repl));
+
+               repl_repl[Anum_pg_database_datdba - 1] = 'r';
+               repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
+
+               /*
+                * Determine the modified ACL for the new owner.  This is only
+                * necessary when the ACL is non-null.
+                */
+               aclDatum = heap_getattr(tuple,
+                                                               Anum_pg_database_datacl,
+                                                               RelationGetDescr(rel),
+                                                               &isNull);
+               if (!isNull)
+               {
+                       newAcl = aclnewowner(DatumGetAclP(aclDatum),
+                                                                datForm->datdba, newOwnerSysId);
+                       repl_repl[Anum_pg_database_datacl - 1] = 'r';
+                       repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
+               }
 
-               CatalogOpenIndices(Num_pg_database_indices,
-                                                  Name_pg_database_indices, idescs);
-               CatalogIndexInsert(idescs, Num_pg_database_indices, rel,
-                                                  newtuple);
-               CatalogCloseIndices(Num_pg_database_indices, idescs);
+               newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
+               simple_heap_update(rel, &newtuple->t_self, newtuple);
+               CatalogUpdateIndexes(rel, newtuple);
+
+               heap_freetuple(newtuple);
        }
 
-       heap_endscan(scan);
-       heap_close(rel, RowExclusiveLock);
+       systable_endscan(scan);
+       heap_close(rel, NoLock);
 }
 
 
-
 /*
  * Helper functions
  */
@@ -589,11 +916,11 @@ static bool
 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
                        int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
                        TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
-                       char *dbpath)
+                       Oid *dbTablespace)
 {
        Relation        relation;
        ScanKeyData scanKey;
-       HeapScanDesc scan;
+       SysScanDesc scan;
        HeapTuple       tuple;
        bool            gottuple;
 
@@ -602,12 +929,15 @@ get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
        /* Caller may wish to grab a better lock on pg_database beforehand... */
        relation = heap_openr(DatabaseRelationName, AccessShareLock);
 
-       ScanKeyEntryInitialize(&scanKey, 0, Anum_pg_database_datname,
-                                                  F_NAMEEQ, NameGetDatum(name));
+       ScanKeyInit(&scanKey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(name));
 
-       scan = heap_beginscan(relation, SnapshotNow, 1, &scanKey);
+       scan = systable_beginscan(relation, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, &scanKey);
 
-       tuple = heap_getnext(scan, ForwardScanDirection);
+       tuple = systable_getnext(scan);
 
        gottuple = HeapTupleIsValid(tuple);
        if (gottuple)
@@ -616,11 +946,11 @@ get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
 
                /* oid of the database */
                if (dbIdP)
-                       *dbIdP = tuple->t_data->t_oid;
+                       *dbIdP = HeapTupleGetOid(tuple);
                /* sysid of the owner */
                if (ownerIdP)
                        *ownerIdP = dbform->datdba;
-               /* multibyte encoding */
+               /* character encoding */
                if (encodingP)
                        *encodingP = dbform->encoding;
                /* allowed as template? */
@@ -635,31 +965,12 @@ get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
                /* limit of frozen XIDs */
                if (dbFrozenXidP)
                        *dbFrozenXidP = dbform->datfrozenxid;
-               /* database path (as registered in pg_database) */
-               if (dbpath)
-               {
-                       Datum           datum;
-                       bool            isnull;
-
-                       datum = heap_getattr(tuple,
-                                                                Anum_pg_database_datpath,
-                                                                RelationGetDescr(relation),
-                                                                &isnull);
-                       if (!isnull)
-                       {
-                               text       *pathtext = DatumGetTextP(datum);
-                               int                     pathlen = VARSIZE(pathtext) - VARHDRSZ;
-
-                               Assert(pathlen >= 0 && pathlen < MAXPGPATH);
-                               strncpy(dbpath, VARDATA(pathtext), pathlen);
-                               *(dbpath + pathlen) = '\0';
-                       }
-                       else
-                               strcpy(dbpath, "");
-               }
+               /* default tablespace for this database */
+               if (dbTablespace)
+                       *dbTablespace = dbform->dattablespace;
        }
 
-       heap_endscan(scan);
+       systable_endscan(scan);
        heap_close(relation, AccessShareLock);
 
        return gottuple;
@@ -672,7 +983,7 @@ have_createdb_privilege(void)
        bool            retval;
 
        utup = SearchSysCache(SHADOWSYSID,
-                                                 ObjectIdGetDatum(GetUserId()),
+                                                 Int32GetDatum(GetUserId()),
                                                  0, 0, 0);
 
        if (!HeapTupleIsValid(utup))
@@ -685,82 +996,262 @@ have_createdb_privilege(void)
        return retval;
 }
 
-
-static char *
-resolve_alt_dbpath(const char *dbpath, Oid dboid)
+/*
+ * Remove tablespace directories
+ *
+ * We don't know what tablespaces db_id is using, so iterate through all
+ * tablespaces removing <tablespace>/db_id
+ */
+static void
+remove_dbtablespaces(Oid db_id)
 {
-       const char *prefix;
-       char       *ret;
-       size_t          len;
-
-       if (dbpath == NULL || dbpath[0] == '\0')
-               return NULL;
+       Relation        rel;
+       HeapScanDesc scan;
+       HeapTuple       tuple;
 
-       if (strchr(dbpath, '/'))
+       rel = heap_openr(TableSpaceRelationName, AccessShareLock);
+       scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               if (dbpath[0] != '/')
-                       elog(ERROR, "Relative paths are not allowed as database locations");
-#ifndef ALLOW_ABSOLUTE_DBPATHS
-               elog(ERROR, "Absolute paths are not allowed as database locations");
-#endif
-               prefix = dbpath;
-       }
-       else
-       {
-               /* must be environment variable */
-               char       *var = getenv(dbpath);
-
-               if (!var)
-                       elog(ERROR, "Postmaster environment variable '%s' not set", dbpath);
-               if (var[0] != '/')
-                       elog(ERROR, "Postmaster environment variable '%s' must be absolute path", dbpath);
-               prefix = var;
+               Oid                     dsttablespace = HeapTupleGetOid(tuple);
+               char       *dstpath;
+               struct stat st;
+
+               /* Don't mess with the global tablespace */
+               if (dsttablespace == GLOBALTABLESPACE_OID)
+                       continue;
+
+               dstpath = GetDatabasePath(db_id, dsttablespace);
+
+               if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
+               {
+                       /* Assume we can ignore it */
+                       pfree(dstpath);
+                       continue;
+               }
+
+               if (!rmtree(dstpath, true))
+                       ereport(WARNING,
+                                       (errmsg("could not remove database directory \"%s\"",
+                                                       dstpath)));
+
+               /* Record the filesystem change in XLOG */
+               {
+                       xl_dbase_drop_rec xlrec;
+                       XLogRecData rdata[2];
+
+                       xlrec.db_id = db_id;
+                       rdata[0].buffer = InvalidBuffer;
+                       rdata[0].data = (char *) &xlrec;
+                       rdata[0].len = offsetof(xl_dbase_drop_rec, dir_path);
+                       rdata[0].next = &(rdata[1]);
+
+                       rdata[1].buffer = InvalidBuffer;
+                       rdata[1].data = (char *) dstpath;
+                       rdata[1].len = strlen(dstpath) + 1;
+                       rdata[1].next = NULL;
+
+                       (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
+               }
+
+               pfree(dstpath);
        }
 
-       len = strlen(prefix) + 6 + sizeof(Oid) * 8 + 1;
-       if (len >= MAXPGPATH - 100)
-               elog(ERROR, "Alternate path is too long");
+       heap_endscan(scan);
+       heap_close(rel, AccessShareLock);
+}
 
-       ret = palloc(len);
-       snprintf(ret, len, "%s/base/%u", prefix, dboid);
 
-       return ret;
+/*
+ * get_database_oid - given a database name, look up the OID
+ *
+ * Returns InvalidOid if database name not found.
+ *
+ * This is not actually used in this file, but is exported for use elsewhere.
+ */
+Oid
+get_database_oid(const char *dbname)
+{
+       Relation        pg_database;
+       ScanKeyData entry[1];
+       SysScanDesc scan;
+       HeapTuple       dbtuple;
+       Oid                     oid;
+
+       /* There's no syscache for pg_database, so must look the hard way */
+       pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
+       ScanKeyInit(&entry[0],
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               CStringGetDatum(dbname));
+       scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
+                                                         SnapshotNow, 1, entry);
+
+       dbtuple = systable_getnext(scan);
+
+       /* We assume that there can be at most one matching tuple */
+       if (HeapTupleIsValid(dbtuple))
+               oid = HeapTupleGetOid(dbtuple);
+       else
+               oid = InvalidOid;
+
+       systable_endscan(scan);
+       heap_close(pg_database, AccessShareLock);
+
+       return oid;
 }
 
 
-static bool
-remove_dbdirs(const char *nominal_loc, const char *alt_loc)
+/*
+ * get_database_name - given a database OID, look up the name
+ *
+ * Returns a palloc'd string, or NULL if no such database.
+ *
+ * This is not actually used in this file, but is exported for use elsewhere.
+ */
+char *
+get_database_name(Oid dbid)
 {
-       const char *target_dir;
-       char            buf[MAXPGPATH + 100];
-       bool            success = true;
+       Relation        pg_database;
+       ScanKeyData entry[1];
+       SysScanDesc scan;
+       HeapTuple       dbtuple;
+       char       *result;
+
+       /* There's no syscache for pg_database, so must look the hard way */
+       pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
+       ScanKeyInit(&entry[0],
+                               ObjectIdAttributeNumber,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(dbid));
+       scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
+                                                         SnapshotNow, 1, entry);
+
+       dbtuple = systable_getnext(scan);
+
+       /* We assume that there can be at most one matching tuple */
+       if (HeapTupleIsValid(dbtuple))
+               result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
+       else
+               result = NULL;
 
-       target_dir = alt_loc ? alt_loc : nominal_loc;
+       systable_endscan(scan);
+       heap_close(pg_database, AccessShareLock);
 
-       /*
-        * Close virtual file descriptors so the kernel has more available for
-        * the system() call below.
-        */
-       closeAllVfds();
+       return result;
+}
+
+/*
+ * DATABASE resource manager's routines
+ */
+void
+dbase_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+       uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
-       if (alt_loc)
+       if (info == XLOG_DBASE_CREATE)
        {
-               /* remove symlink */
-               if (unlink(nominal_loc) != 0)
+               xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
+               char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
+               struct stat st;
+
+#ifndef WIN32
+               char            buf[2 * MAXPGPATH + 100];
+#endif
+
+               /*
+                * Our theory for replaying a CREATE is to forcibly drop the
+                * target subdirectory if present, then re-copy the source data.
+                * This may be more work than needed, but it is simple to
+                * implement.
+                */
+               if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
                {
-                       elog(WARNING, "could not remove '%s': %m", nominal_loc);
-                       success = false;
+                       if (!rmtree(dst_path, true))
+                               ereport(WARNING,
+                                       (errmsg("could not remove database directory \"%s\"",
+                                                       dst_path)));
                }
+
+               /*
+                * Force dirty buffers out to disk, to ensure source database is
+                * up-to-date for the copy.  (We really only need to flush buffers for
+                * the source database...)
+                */
+               BufferSync(-1, -1);
+
+#ifndef WIN32
+
+               /*
+                * Copy this subdirectory to the new location
+                *
+                * XXX use of cp really makes this code pretty grotty, particularly
+                * with respect to lack of ability to report errors well.  Someday
+                * rewrite to do it for ourselves.
+                */
+
+               /* We might need to use cp -R one day for portability */
+               snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
+                                xlrec->src_path, dst_path);
+               if (system(buf) != 0)
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory"),
+                                        errdetail("Failing system command was: %s", buf),
+                                        errhint("Look in the postmaster's stderr log for more information.")));
+#else                                                  /* WIN32 */
+               if (copydir(xlrec->src_path, dst_path) != 0)
+               {
+                       /* copydir should already have given details of its troubles */
+                       ereport(ERROR,
+                                       (errmsg("could not initialize database directory")));
+               }
+#endif   /* WIN32 */
+       }
+       else if (info == XLOG_DBASE_DROP)
+       {
+               xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
+
+               /*
+                * Drop pages for this database that are in the shared buffer
+                * cache
+                */
+               DropBuffers(xlrec->db_id);
+
+               if (!rmtree(xlrec->dir_path, true))
+                       ereport(WARNING,
+                                       (errmsg("could not remove database directory \"%s\"",
+                                                       xlrec->dir_path)));
        }
+       else
+               elog(PANIC, "dbase_redo: unknown op code %u", info);
+}
+
+void
+dbase_undo(XLogRecPtr lsn, XLogRecord *record)
+{
+       elog(PANIC, "dbase_undo: unimplemented");
+}
 
-       snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir);
+void
+dbase_desc(char *buf, uint8 xl_info, char *rec)
+{
+       uint8           info = xl_info & ~XLR_INFO_MASK;
 
-       if (system(buf) != 0)
+       if (info == XLOG_DBASE_CREATE)
        {
-               elog(WARNING, "database directory '%s' could not be removed",
-                        target_dir);
-               success = false;
+               xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
+               char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
+
+               sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
+                               xlrec->db_id, xlrec->src_path, dst_path);
        }
+       else if (info == XLOG_DBASE_DROP)
+       {
+               xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
 
-       return success;
+               sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
+                               xlrec->db_id, xlrec->dir_path);
+       }
+       else
+               strcat(buf, "UNKNOWN");
 }