]> granicus.if.org Git - postgresql/blobdiff - src/backend/commands/dbcommands.c
Improve vacuum code to track minimum Xids per table instead of per database.
[postgresql] / src / backend / commands / dbcommands.c
index 40cfb3e6586dea0723b5cd2b4910e5d752e14d55..6d744a5bad32f524d19219e83bde704af5b32079 100644 (file)
@@ -3,19 +3,17 @@
  * dbcommands.c
  *             Database management commands (create/drop database).
  *
- * Note: database creation/destruction commands take ExclusiveLock on
- * pg_database to ensure that no two proceed in parallel.  We must use
- * at least this level of locking to ensure that no two backends try to
- * write the flat-file copy of pg_database at once.  We avoid using
- * AccessExclusiveLock since there's no need to lock out ordinary readers
- * of pg_database.
+ * Note: database creation/destruction commands use exclusive locks on
+ * the database objects (as expressed by LockSharedObject()) to avoid
+ * stepping on each others' toes.  Formerly we used table-level locks
+ * on pg_database, but that's too coarse-grained.
  *
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.159 2005/06/06 20:22:57 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.182 2006/07/10 16:20:50 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "catalog/catalog.h"
+#include "catalog/dependency.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_database.h"
-#include "catalog/pg_shadow.h"
 #include "catalog/pg_tablespace.h"
-#include "catalog/indexing.h"
 #include "commands/comment.h"
 #include "commands/dbcommands.h"
 #include "commands/tablespace.h"
 
 
 /* non-export function prototypes */
-static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
+static bool get_db_info(const char *name, LOCKMODE lockmode,
+                       Oid *dbIdP, Oid *ownerIdP,
                        int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
                        Oid *dbLastSysOidP,
-                       TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
+                       TransactionId *dbVacuumXidP, TransactionId *dbMinXidP,
                        Oid *dbTablespace);
 static bool have_createdb_privilege(void);
 static void remove_dbtablespaces(Oid db_id);
@@ -70,35 +70,32 @@ createdb(const CreatedbStmt *stmt)
        HeapScanDesc scan;
        Relation        rel;
        Oid                     src_dboid;
-       AclId           src_owner;
+       Oid                     src_owner;
        int                     src_encoding;
        bool            src_istemplate;
        bool            src_allowconn;
        Oid                     src_lastsysoid;
        TransactionId src_vacuumxid;
-       TransactionId src_frozenxid;
+       TransactionId src_minxid;
        Oid                     src_deftablespace;
-       Oid                     dst_deftablespace;
+       volatile Oid dst_deftablespace;
        Relation        pg_database_rel;
        HeapTuple       tuple;
-       TupleDesc       pg_database_dsc;
        Datum           new_record[Natts_pg_database];
        char            new_record_nulls[Natts_pg_database];
        Oid                     dboid;
-       AclId           datdba;
+       Oid                     datdba;
        ListCell   *option;
        DefElem    *dtablespacename = NULL;
        DefElem    *downer = NULL;
        DefElem    *dtemplate = NULL;
        DefElem    *dencoding = NULL;
+       DefElem    *dconnlimit = NULL;
        char       *dbname = stmt->dbname;
        char       *dbowner = NULL;
-       char       *dbtemplate = NULL;
+       const char *dbtemplate = NULL;
        int                     encoding = -1;
-
-#ifndef WIN32
-       char            buf[2 * MAXPGPATH + 100];
-#endif
+       int                     dbconnlimit = -1;
 
        /* don't call this in a transaction block */
        PreventTransactionChain((void *) stmt, "CREATE DATABASE");
@@ -140,6 +137,14 @@ createdb(const CreatedbStmt *stmt)
                                                 errmsg("conflicting or redundant options")));
                        dencoding = defel;
                }
+               else if (strcmp(defel->defname, "connectionlimit") == 0)
+               {
+                       if (dconnlimit)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       dconnlimit = defel;
+               }
                else if (strcmp(defel->defname, "location") == 0)
                {
                        ereport(WARNING,
@@ -185,57 +190,49 @@ createdb(const CreatedbStmt *stmt)
                        elog(ERROR, "unrecognized node type: %d",
                                 nodeTag(dencoding->arg));
        }
+       if (dconnlimit && dconnlimit->arg)
+               dbconnlimit = intVal(dconnlimit->arg);
 
-       /* obtain sysid of proposed owner */
+       /* obtain OID of proposed owner */
        if (dbowner)
-               datdba = get_usesysid(dbowner); /* will ereport if no such user */
+               datdba = get_roleid_checked(dbowner);
        else
                datdba = GetUserId();
 
-       if (datdba == GetUserId())
-       {
-               /* creating database for self: can be superuser or createdb */
-               if (!superuser() && !have_createdb_privilege())
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                        errmsg("permission denied to create database")));
-       }
-       else
-       {
-               /* creating database for someone else: must be superuser */
-               /* note that the someone else need not have any permissions */
-               if (!superuser())
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                        errmsg("must be superuser to create database for another user")));
-       }
-
        /*
-        * Check for db name conflict.  There is a race condition here, since
-        * another backend could create the same DB name before we commit.
-        * However, holding an exclusive lock on pg_database for the whole
-        * time we are copying the source database doesn't seem like a good
-        * idea, so accept possibility of race to create.  We will check again
-        * after we grab the exclusive lock.
+        * To create a database, must have createdb privilege and must be able to
+        * become the target role (this does not imply that the target role itself
+        * must have createdb privilege).  The latter provision guards against
+        * "giveaway" attacks.  Note that a superuser will always have both of
+        * these privileges a fortiori.
         */
-       if (get_db_info(dbname, NULL, NULL, NULL,
-                                       NULL, NULL, NULL, NULL, NULL, NULL))
+       if (!have_createdb_privilege())
                ereport(ERROR,
-                               (errcode(ERRCODE_DUPLICATE_DATABASE),
-                                errmsg("database \"%s\" already exists", dbname)));
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                errmsg("permission denied to create database")));
+
+       check_is_member_of_role(GetUserId(), datdba);
 
        /*
-        * Lookup database (template) to be cloned.
+        * Lookup database (template) to be cloned, and obtain share lock on it.
+        * ShareLock allows two CREATE DATABASEs to work from the same template
+        * concurrently, while ensuring no one is busy dropping it in parallel
+        * (which would be Very Bad since we'd likely get an incomplete copy
+        * without knowing it).  This also prevents any new connections from being
+        * made to the source until we finish copying it, so we can be sure it
+        * won't change underneath us.
         */
        if (!dbtemplate)
                dbtemplate = "template1";               /* Default template database name */
 
-       if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
+       if (!get_db_info(dbtemplate, ShareLock,
+                                        &src_dboid, &src_owner, &src_encoding,
                                         &src_istemplate, &src_allowconn, &src_lastsysoid,
-                                        &src_vacuumxid, &src_frozenxid, &src_deftablespace))
+                                        &src_vacuumxid, &src_minxid, &src_deftablespace))
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
-                errmsg("template database \"%s\" does not exist", dbtemplate)));
+                                errmsg("template database \"%s\" does not exist",
+                                               dbtemplate)));
 
        /*
         * Permission check: to copy a DB that's not marked datistemplate, you
@@ -243,7 +240,7 @@ createdb(const CreatedbStmt *stmt)
         */
        if (!src_istemplate)
        {
-               if (!superuser() && GetUserId() != src_owner)
+               if (!pg_database_ownercheck(src_dboid, GetUserId()))
                        ereport(ERROR,
                                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                         errmsg("permission denied to copy database \"%s\"",
@@ -253,14 +250,13 @@ createdb(const CreatedbStmt *stmt)
        /*
         * The source DB can't have any active backends, except this one
         * (exception is to allow CREATE DB while connected to template1).
-        * Otherwise we might copy inconsistent data.  This check is not
-        * bulletproof, since someone might connect while we are copying...
+        * Otherwise we might copy inconsistent data.
         */
        if (DatabaseHasActiveBackends(src_dboid, true))
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_IN_USE),
-               errmsg("source database \"%s\" is being accessed by other users",
-                          dbtemplate)));
+                       errmsg("source database \"%s\" is being accessed by other users",
+                                  dbtemplate)));
 
        /* If encoding is defaulted, use source's encoding */
        if (encoding < 0)
@@ -295,7 +291,7 @@ createdb(const CreatedbStmt *stmt)
                /*
                 * If we are trying to change the default tablespace of the template,
                 * we require that the template not have any files in the new default
-                * tablespace.  This is necessary because otherwise the copied
+                * tablespace.  This is necessary because otherwise the copied
                 * database would contain pg_class rows that refer to its default
                 * tablespace both explicitly (by OID) and implicitly (as zero), which
                 * would cause problems.  For example another CREATE DATABASE using
@@ -331,151 +327,22 @@ createdb(const CreatedbStmt *stmt)
        }
 
        /*
-        * Normally we mark the new database with the same datvacuumxid and
-        * datfrozenxid as the source.  However, if the source is not allowing
-        * connections then we assume it is fully frozen, and we can set the
-        * current transaction ID as the xid limits.  This avoids immediately
-        * starting to generate warnings after cloning template0.
+        * Check for db name conflict.  This is just to give a more friendly
+        * error message than "unique index violation".  There's a race condition
+        * but we're willing to accept the less friendly message in that case.
         */
-       if (!src_allowconn)
-               src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
-
-       /*
-        * Preassign OID for pg_database tuple, so that we can compute db
-        * path.
-        */
-       dboid = newoid();
-
-       /*
-        * Force dirty buffers out to disk, to ensure source database is
-        * up-to-date for the copy.  (We really only need to flush buffers for
-        * the source database, but bufmgr.c provides no API for that.)
-        */
-       BufferSync();
-
-       /*
-        * Close virtual file descriptors so the kernel has more available for
-        * the system() calls below.
-        */
-       closeAllVfds();
-
-       /*
-        * Iterate through all tablespaces of the template database, and copy
-        * each one to the new database.
-        */
-       rel = heap_open(TableSpaceRelationId, AccessShareLock);
-       scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
-       while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-       {
-               Oid                     srctablespace = HeapTupleGetOid(tuple);
-               Oid                     dsttablespace;
-               char       *srcpath;
-               char       *dstpath;
-               struct stat st;
-
-               /* No need to copy global tablespace */
-               if (srctablespace == GLOBALTABLESPACE_OID)
-                       continue;
-
-               srcpath = GetDatabasePath(src_dboid, srctablespace);
-
-               if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
-                       directory_is_empty(srcpath))
-               {
-                       /* Assume we can ignore it */
-                       pfree(srcpath);
-                       continue;
-               }
-
-               if (srctablespace == src_deftablespace)
-                       dsttablespace = dst_deftablespace;
-               else
-                       dsttablespace = srctablespace;
-
-               dstpath = GetDatabasePath(dboid, dsttablespace);
-
-               if (stat(dstpath, &st) == 0 || errno != ENOENT)
-               {
-                       remove_dbtablespaces(dboid);
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory"),
-                                        errdetail("Directory \"%s\" already exists.",
-                                                          dstpath)));
-               }
-
-#ifndef WIN32
-
-               /*
-                * Copy this subdirectory to the new location
-                *
-                * XXX use of cp really makes this code pretty grotty, particularly
-                * with respect to lack of ability to report errors well.  Someday
-                * rewrite to do it for ourselves.
-                */
-
-               /* We might need to use cp -R one day for portability */
-               snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
-                                srcpath, dstpath);
-               if (system(buf) != 0)
-               {
-                       remove_dbtablespaces(dboid);
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory"),
-                                        errdetail("Failing system command was: %s", buf),
-                                        errhint("Look in the postmaster's stderr log for more information.")));
-               }
-#else                                                  /* WIN32 */
-               if (copydir(srcpath, dstpath) != 0)
-               {
-                       /* copydir should already have given details of its troubles */
-                       remove_dbtablespaces(dboid);
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory")));
-               }
-#endif   /* WIN32 */
-
-               /* Record the filesystem change in XLOG */
-               {
-                       xl_dbase_create_rec xlrec;
-                       XLogRecData rdata[1];
-
-                       xlrec.db_id = dboid;
-                       xlrec.tablespace_id = dsttablespace;
-                       xlrec.src_db_id = src_dboid;
-                       xlrec.src_tablespace_id = srctablespace;
-
-                       rdata[0].data = (char *) &xlrec;
-                       rdata[0].len = sizeof(xl_dbase_create_rec);
-                       rdata[0].buffer = InvalidBuffer;
-                       rdata[0].next = NULL;
-
-                       (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
-               }
-       }
-       heap_endscan(scan);
-       heap_close(rel, AccessShareLock);
-
-       /*
-        * Now OK to grab exclusive lock on pg_database.
-        */
-       pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);
-
-       /* Check to see if someone else created same DB name meanwhile. */
-       if (get_db_info(dbname, NULL, NULL, NULL,
-                                       NULL, NULL, NULL, NULL, NULL, NULL))
-       {
-               /* Don't hold lock while doing recursive remove */
-               heap_close(pg_database_rel, ExclusiveLock);
-               remove_dbtablespaces(dboid);
+       if (OidIsValid(get_database_oid(dbname)))
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_DATABASE),
                                 errmsg("database \"%s\" already exists", dbname)));
-       }
 
        /*
-        * Insert a new tuple into pg_database
+        * Insert a new tuple into pg_database.  This establishes our ownership
+        * of the new database name (anyone else trying to insert the same name
+        * will block on the unique index, and fail after we commit).  It also
+        * assigns the OID that the new database will have.
         */
-       pg_database_dsc = RelationGetDescr(pg_database_rel);
+       pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
 
        /* Form tuple */
        MemSet(new_record, 0, sizeof(new_record));
@@ -483,13 +350,14 @@ createdb(const CreatedbStmt *stmt)
 
        new_record[Anum_pg_database_datname - 1] =
                DirectFunctionCall1(namein, CStringGetDatum(dbname));
-       new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
+       new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
        new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
        new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
        new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
+       new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
        new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
        new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
-       new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
+       new_record[Anum_pg_database_datminxid - 1] = TransactionIdGetDatum(src_minxid);
        new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
 
        /*
@@ -501,23 +369,157 @@ createdb(const CreatedbStmt *stmt)
        new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
        new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
 
-       tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
+       tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
+                                                  new_record, new_record_nulls);
 
-       HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
-                                                                                * selection */
-
-       simple_heap_insert(pg_database_rel, tuple);
+       dboid = simple_heap_insert(pg_database_rel, tuple);
 
        /* Update indexes */
        CatalogUpdateIndexes(pg_database_rel, tuple);
 
-       /* Close pg_database, but keep exclusive lock till commit */
-       heap_close(pg_database_rel, NoLock);
+       /*
+        * Now generate additional catalog entries associated with the new DB
+        */
+
+       /* Register owner dependency */
+       recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
+
+       /* Create pg_shdepend entries for objects within database */
+       copyTemplateDependencies(src_dboid, dboid);
 
        /*
-        * Set flag to update flat database file at commit.
+        * Force dirty buffers out to disk, to ensure source database is
+        * up-to-date for the copy.  (We really only need to flush buffers for the
+        * source database, but bufmgr.c provides no API for that.)
         */
-       database_file_update_needed();
+       BufferSync();
+
+       /*
+        * Once we start copying subdirectories, we need to be able to clean 'em
+        * up if we fail.  Establish a TRY block to make sure this happens. (This
+        * is not a 100% solution, because of the possibility of failure during
+        * transaction commit after we leave this routine, but it should handle
+        * most scenarios.)
+        */
+       PG_TRY();
+       {
+               /*
+                * Iterate through all tablespaces of the template database, and copy
+                * each one to the new database.
+                */
+               rel = heap_open(TableSpaceRelationId, AccessShareLock);
+               scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+               while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+               {
+                       Oid                     srctablespace = HeapTupleGetOid(tuple);
+                       Oid                     dsttablespace;
+                       char       *srcpath;
+                       char       *dstpath;
+                       struct stat st;
+
+                       /* No need to copy global tablespace */
+                       if (srctablespace == GLOBALTABLESPACE_OID)
+                               continue;
+
+                       srcpath = GetDatabasePath(src_dboid, srctablespace);
+
+                       if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+                               directory_is_empty(srcpath))
+                       {
+                               /* Assume we can ignore it */
+                               pfree(srcpath);
+                               continue;
+                       }
+
+                       if (srctablespace == src_deftablespace)
+                               dsttablespace = dst_deftablespace;
+                       else
+                               dsttablespace = srctablespace;
+
+                       dstpath = GetDatabasePath(dboid, dsttablespace);
+
+                       /*
+                        * Copy this subdirectory to the new location
+                        *
+                        * We don't need to copy subdirectories
+                        */
+                       copydir(srcpath, dstpath, false);
+
+                       /* Record the filesystem change in XLOG */
+                       {
+                               xl_dbase_create_rec xlrec;
+                               XLogRecData rdata[1];
+
+                               xlrec.db_id = dboid;
+                               xlrec.tablespace_id = dsttablespace;
+                               xlrec.src_db_id = src_dboid;
+                               xlrec.src_tablespace_id = srctablespace;
+
+                               rdata[0].data = (char *) &xlrec;
+                               rdata[0].len = sizeof(xl_dbase_create_rec);
+                               rdata[0].buffer = InvalidBuffer;
+                               rdata[0].next = NULL;
+
+                               (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
+                       }
+               }
+               heap_endscan(scan);
+               heap_close(rel, AccessShareLock);
+
+               /*
+                * We force a checkpoint before committing.  This effectively means
+                * that committed XLOG_DBASE_CREATE operations will never need to be
+                * replayed (at least not in ordinary crash recovery; we still have to
+                * make the XLOG entry for the benefit of PITR operations). This
+                * avoids two nasty scenarios:
+                *
+                * #1: When PITR is off, we don't XLOG the contents of newly created
+                * indexes; therefore the drop-and-recreate-whole-directory behavior
+                * of DBASE_CREATE replay would lose such indexes.
+                *
+                * #2: Since we have to recopy the source database during DBASE_CREATE
+                * replay, we run the risk of copying changes in it that were
+                * committed after the original CREATE DATABASE command but before the
+                * system crash that led to the replay.  This is at least unexpected
+                * and at worst could lead to inconsistencies, eg duplicate table
+                * names.
+                *
+                * (Both of these were real bugs in releases 8.0 through 8.0.3.)
+                *
+                * In PITR replay, the first of these isn't an issue, and the second
+                * is only a risk if the CREATE DATABASE and subsequent template
+                * database change both occur while a base backup is being taken.
+                * There doesn't seem to be much we can do about that except document
+                * it as a limitation.
+                *
+                * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
+                * we can avoid this.
+                */
+               RequestCheckpoint(true, false);
+
+               /*
+                * Close pg_database, but keep lock till commit (this is important
+                * to prevent any risk of deadlock failure while updating flat file)
+                */
+               heap_close(pg_database_rel, NoLock);
+
+               /*
+                * Set flag to update flat database file at commit.
+                */
+               database_file_update_needed();
+       }
+       PG_CATCH();
+       {
+               /* Release lock on source database before doing recursive remove */
+               UnlockSharedObject(DatabaseRelationId, src_dboid, 0,
+                                                  ShareLock);
+
+               /* Throw away any successfully copied subdirectories */
+               remove_dbtablespaces(dboid);
+
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
 }
 
 
@@ -525,14 +527,11 @@ createdb(const CreatedbStmt *stmt)
  * DROP DATABASE
  */
 void
-dropdb(const char *dbname)
+dropdb(const char *dbname, bool missing_ok)
 {
-       int4            db_owner;
-       bool            db_istemplate;
        Oid                     db_id;
+       bool            db_istemplate;
        Relation        pgdbrel;
-       SysScanDesc pgdbscan;
-       ScanKeyData key;
        HeapTuple       tup;
 
        PreventTransactionChain((void *) dbname, "DROP DATABASE");
@@ -545,33 +544,45 @@ dropdb(const char *dbname)
                                 errmsg("cannot drop the currently open database")));
 
        /*
-        * Obtain exclusive lock on pg_database.  We need this to ensure that
-        * no new backend starts up in the target database while we are
-        * deleting it.  (Actually, a new backend might still manage to start
-        * up, because it isn't able to lock pg_database while starting.  But
-        * it will detect its error in ReverifyMyDatabase and shut down before
-        * any serious damage is done.  See postinit.c.)
-        *
-        * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient
-        * since ReverifyMyDatabase takes RowShareLock.  This allows ordinary
-        * readers of pg_database to proceed in parallel.
+        * Look up the target database's OID, and get exclusive lock on it.
+        * We need this to ensure that no new backend starts up in the target
+        * database while we are deleting it (see postinit.c), and that no one is
+        * using it as a CREATE DATABASE template or trying to delete it for
+        * themselves.
         */
-       pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
+       pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
 
-       if (!get_db_info(dbname, &db_id, &db_owner, NULL,
+       if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
                                         &db_istemplate, NULL, NULL, NULL, NULL, NULL))
-               ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_DATABASE),
-                                errmsg("database \"%s\" does not exist", dbname)));
+       {
+               if (!missing_ok)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                        errmsg("database \"%s\" does not exist", dbname)));
+               }
+               else
+               {
+                       /* Close pg_database, release the lock, since we changed nothing */
+                       heap_close(pgdbrel, RowExclusiveLock);
+                       ereport(NOTICE,
+                                       (errmsg("database \"%s\" does not exist, skipping",
+                                                       dbname)));
+                       return;
+               }
+       }
 
-       if (GetUserId() != db_owner && !superuser())
+       /*
+        * Permission checks
+        */
+       if (!pg_database_ownercheck(db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
                                           dbname);
 
        /*
         * Disallow dropping a DB that is marked istemplate.  This is just to
-        * prevent people from accidentally dropping template0 or template1;
-        * they can do so if they're really determined ...
+        * prevent people from accidentally dropping template0 or template1; they
+        * can do so if they're really determined ...
         */
        if (db_istemplate)
                ereport(ERROR,
@@ -579,55 +590,44 @@ dropdb(const char *dbname)
                                 errmsg("cannot drop a template database")));
 
        /*
-        * Check for active backends in the target database.
+        * Check for active backends in the target database.  (Because we hold
+        * the database lock, no new ones can start after this.)
         */
        if (DatabaseHasActiveBackends(db_id, false))
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_IN_USE),
-                          errmsg("database \"%s\" is being accessed by other users",
-                                         dbname)));
+                                errmsg("database \"%s\" is being accessed by other users",
+                                               dbname)));
 
        /*
-        * Find the database's tuple by OID (should be unique).
+        * Remove the database's tuple from pg_database.
         */
-       ScanKeyInit(&key,
-                               ObjectIdAttributeNumber,
-                               BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(db_id));
-
-       pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndexId, true,
-                                                                 SnapshotNow, 1, &key);
-
-       tup = systable_getnext(pgdbscan);
+       tup = SearchSysCache(DATABASEOID,
+                                                ObjectIdGetDatum(db_id),
+                                                0, 0, 0);
        if (!HeapTupleIsValid(tup))
-       {
-               /*
-                * This error should never come up since the existence of the
-                * database is checked earlier
-                */
-               elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
-                        dbname);
-       }
+               elog(ERROR, "cache lookup failed for database %u", db_id);
 
-       /* Remove the database's tuple from pg_database */
        simple_heap_delete(pgdbrel, &tup->t_self);
 
-       systable_endscan(pgdbscan);
+       ReleaseSysCache(tup);
 
        /*
-        * Delete any comments associated with the database
-        *
-        * NOTE: this is probably dead code since any such comments should have
-        * been in that database, not mine.
+        * Delete any comments associated with the database.
         */
-       DeleteComments(db_id, DatabaseRelationId, 0);
+       DeleteSharedComments(db_id, DatabaseRelationId);
 
        /*
-        * Drop pages for this database that are in the shared buffer cache.
-        * This is important to ensure that no remaining backend tries to
-        * write out a dirty buffer to the dead database later...
+        * Remove shared dependency references for the database.
         */
-       DropBuffers(db_id);
+       dropDatabaseDependencies(db_id);
+
+       /*
+        * Drop pages for this database that are in the shared buffer cache. This
+        * is important to ensure that no remaining backend tries to write out a
+        * dirty buffer to the dead database later...
+        */
+       DropDatabaseBuffers(db_id);
 
        /*
         * Also, clean out any entries in the shared free space map.
@@ -639,7 +639,7 @@ dropdb(const char *dbname)
         * open files, which would cause rmdir() to fail.
         */
 #ifdef WIN32
-       RequestCheckpoint(true);
+       RequestCheckpoint(true, false);
 #endif
 
        /*
@@ -647,7 +647,10 @@ dropdb(const char *dbname)
         */
        remove_dbtablespaces(db_id);
 
-       /* Close pg_database, but keep exclusive lock till commit */
+       /*
+        * Close pg_database, but keep lock till commit (this is important
+        * to prevent any risk of deadlock failure while updating flat file)
+        */
        heap_close(pgdbrel, NoLock);
 
        /*
@@ -663,87 +666,74 @@ dropdb(const char *dbname)
 void
 RenameDatabase(const char *oldname, const char *newname)
 {
-       HeapTuple       tup,
-                               newtup;
+       Oid                     db_id;
+       HeapTuple       newtup;
        Relation        rel;
-       SysScanDesc scan,
-                               scan2;
-       ScanKeyData key,
-                               key2;
 
        /*
-        * Obtain ExclusiveLock so that no new session gets started
-        * while the rename is in progress.
+        * Look up the target database's OID, and get exclusive lock on it.
+        * We need this for the same reasons as DROP DATABASE.
         */
-       rel = heap_open(DatabaseRelationId, ExclusiveLock);
-
-       ScanKeyInit(&key,
-                               Anum_pg_database_datname,
-                               BTEqualStrategyNumber, F_NAMEEQ,
-                               NameGetDatum(oldname));
-       scan = systable_beginscan(rel, DatabaseNameIndexId, true,
-                                                         SnapshotNow, 1, &key);
+       rel = heap_open(DatabaseRelationId, RowExclusiveLock);
 
-       tup = systable_getnext(scan);
-       if (!HeapTupleIsValid(tup))
+       if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
+                                        NULL, NULL, NULL, NULL, NULL, NULL))
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", oldname)));
 
        /*
-        * XXX Client applications probably store the current database
-        * somewhere, so renaming it could cause confusion.  On the other
-        * hand, there may not be an actual problem besides a little
-        * confusion, so think about this and decide.
+        * XXX Client applications probably store the current database somewhere,
+        * so renaming it could cause confusion.  On the other hand, there may not
+        * be an actual problem besides a little confusion, so think about this
+        * and decide.
         */
-       if (HeapTupleGetOid(tup) == MyDatabaseId)
+       if (db_id == MyDatabaseId)
                ereport(ERROR,
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("current database may not be renamed")));
 
        /*
-        * Make sure the database does not have active sessions.  Might not be
-        * necessary, but it's consistent with other database operations.
+        * Make sure the database does not have active sessions.  This is the
+        * same concern as above, but applied to other sessions.
         */
-       if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
+       if (DatabaseHasActiveBackends(db_id, false))
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_IN_USE),
-                          errmsg("database \"%s\" is being accessed by other users",
-                                         oldname)));
+                                errmsg("database \"%s\" is being accessed by other users",
+                                               oldname)));
 
        /* make sure the new name doesn't exist */
-       ScanKeyInit(&key2,
-                               Anum_pg_database_datname,
-                               BTEqualStrategyNumber, F_NAMEEQ,
-                               NameGetDatum(newname));
-       scan2 = systable_beginscan(rel, DatabaseNameIndexId, true,
-                                                          SnapshotNow, 1, &key2);
-       if (HeapTupleIsValid(systable_getnext(scan2)))
+       if (OidIsValid(get_database_oid(newname)))
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_DATABASE),
                                 errmsg("database \"%s\" already exists", newname)));
-       systable_endscan(scan2);
 
        /* must be owner */
-       if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+       if (!pg_database_ownercheck(db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
                                           oldname);
 
        /* must have createdb rights */
-       if (!superuser() && !have_createdb_privilege())
+       if (!have_createdb_privilege())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 errmsg("permission denied to rename database")));
 
        /* rename */
-       newtup = heap_copytuple(tup);
+       newtup = SearchSysCacheCopy(DATABASEOID,
+                                                               ObjectIdGetDatum(db_id),
+                                                               0, 0, 0);
+       if (!HeapTupleIsValid(newtup))
+               elog(ERROR, "cache lookup failed for database %u", db_id);
        namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
        simple_heap_update(rel, &newtup->t_self, newtup);
        CatalogUpdateIndexes(rel, newtup);
 
-       systable_endscan(scan);
-
-       /* Close pg_database, but keep exclusive lock till commit */
+       /*
+        * Close pg_database, but keep lock till commit (this is important
+        * to prevent any risk of deadlock failure while updating flat file)
+        */
        heap_close(rel, NoLock);
 
        /*
@@ -753,6 +743,99 @@ RenameDatabase(const char *oldname, const char *newname)
 }
 
 
+/*
+ * ALTER DATABASE name ...
+ */
+void
+AlterDatabase(AlterDatabaseStmt *stmt)
+{
+       Relation        rel;
+       HeapTuple       tuple,
+                               newtuple;
+       ScanKeyData scankey;
+       SysScanDesc scan;
+       ListCell   *option;
+       int                     connlimit = -1;
+       DefElem    *dconnlimit = NULL;
+       Datum           new_record[Natts_pg_database];
+       char            new_record_nulls[Natts_pg_database];
+       char            new_record_repl[Natts_pg_database];
+
+       /* Extract options from the statement node tree */
+       foreach(option, stmt->options)
+       {
+               DefElem    *defel = (DefElem *) lfirst(option);
+
+               if (strcmp(defel->defname, "connectionlimit") == 0)
+               {
+                       if (dconnlimit)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       dconnlimit = defel;
+               }
+               else
+                       elog(ERROR, "option \"%s\" not recognized",
+                                defel->defname);
+       }
+
+       if (dconnlimit)
+               connlimit = intVal(dconnlimit->arg);
+
+       /*
+        * Get the old tuple.  We don't need a lock on the database per se,
+        * because we're not going to do anything that would mess up incoming
+        * connections.
+        */
+       rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               NameGetDatum(stmt->dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+                                                         SnapshotNow, 1, &scankey);
+       tuple = systable_getnext(scan);
+       if (!HeapTupleIsValid(tuple))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", stmt->dbname)));
+
+       if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                          stmt->dbname);
+
+       /*
+        * Build an updated tuple, perusing the information just obtained
+        */
+       MemSet(new_record, 0, sizeof(new_record));
+       MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
+       MemSet(new_record_repl, ' ', sizeof(new_record_repl));
+
+       if (dconnlimit)
+       {
+               new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
+               new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
+       }
+
+       newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
+                                                               new_record_nulls, new_record_repl);
+       simple_heap_update(rel, &tuple->t_self, newtuple);
+
+       /* Update indexes */
+       CatalogUpdateIndexes(rel, newtuple);
+
+       systable_endscan(scan);
+
+       /* Close pg_database, but keep lock till commit */
+       heap_close(rel, NoLock);
+
+       /*
+        * We don't bother updating the flat file since the existing options for
+        * ALTER DATABASE don't affect it.
+        */
+}
+
+
 /*
  * ALTER DATABASE name SET ...
  */
@@ -772,8 +855,9 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
        valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
 
        /*
-        * We don't need ExclusiveLock since we aren't updating the
-        * flat file.
+        * Get the old tuple.  We don't need a lock on the database per se,
+        * because we're not going to do anything that would mess up incoming
+        * connections.
         */
        rel = heap_open(DatabaseRelationId, RowExclusiveLock);
        ScanKeyInit(&scankey,
@@ -788,8 +872,7 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", stmt->dbname)));
 
-       if (!(superuser()
-               || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
+       if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
                                           stmt->dbname);
 
@@ -838,8 +921,8 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
        heap_close(rel, NoLock);
 
        /*
-        * We don't bother updating the flat file since ALTER DATABASE SET
-        * doesn't affect it.
+        * We don't bother updating the flat file since ALTER DATABASE SET doesn't
+        * affect it.
         */
 }
 
@@ -848,7 +931,7 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
  * ALTER DATABASE name OWNER TO newowner
  */
 void
-AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
+AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 {
        HeapTuple       tuple;
        Relation        rel;
@@ -857,8 +940,9 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
        Form_pg_database datForm;
 
        /*
-        * We don't need ExclusiveLock since we aren't updating the
-        * flat file.
+        * Get the old tuple.  We don't need a lock on the database per se,
+        * because we're not going to do anything that would mess up incoming
+        * connections.
         */
        rel = heap_open(DatabaseRelationId, RowExclusiveLock);
        ScanKeyInit(&scankey,
@@ -880,7 +964,7 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
         * command to have succeeded.  This is to be consistent with other
         * objects.
         */
-       if (datForm->datdba != newOwnerSysId)
+       if (datForm->datdba != newOwnerId)
        {
                Datum           repl_val[Natts_pg_database];
                char            repl_null[Natts_pg_database];
@@ -890,17 +974,33 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
                bool            isNull;
                HeapTuple       newtuple;
 
-               /* must be superuser to change ownership */
-               if (!superuser())
+               /* Otherwise, must be owner of the existing object */
+               if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
+                       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+                                                  dbname);
+
+               /* Must be able to become new owner */
+               check_is_member_of_role(GetUserId(), newOwnerId);
+
+               /*
+                * must have createdb rights
+                *
+                * NOTE: This is different from other alter-owner checks in that the
+                * current user is checked for createdb privileges instead of the
+                * destination owner.  This is consistent with the CREATE case for
+                * databases.  Because superusers will always have this right, we need
+                * no special case for them.
+                */
+               if (!have_createdb_privilege())
                        ereport(ERROR,
                                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                        errmsg("must be superuser to change owner")));
+                                  errmsg("permission denied to change owner of database")));
 
                memset(repl_null, ' ', sizeof(repl_null));
                memset(repl_repl, ' ', sizeof(repl_repl));
 
                repl_repl[Anum_pg_database_datdba - 1] = 'r';
-               repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
+               repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
 
                /*
                 * Determine the modified ACL for the new owner.  This is only
@@ -913,7 +1013,7 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
                if (!isNull)
                {
                        newAcl = aclnewowner(DatumGetAclP(aclDatum),
-                                                                datForm->datdba, newOwnerSysId);
+                                                                datForm->datdba, newOwnerId);
                        repl_repl[Anum_pg_database_datacl - 1] = 'r';
                        repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
                }
@@ -923,6 +1023,10 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
                CatalogUpdateIndexes(rel, newtuple);
 
                heap_freetuple(newtuple);
+
+               /* Update owner dependency reference */
+               changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
+                                                               newOwnerId);
        }
 
        systable_endscan(scan);
@@ -941,72 +1045,127 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
  * Helper functions
  */
 
+/*
+ * Look up info about the database named "name".  If the database exists,
+ * obtain the specified lock type on it, fill in any of the remaining
+ * parameters that aren't NULL, and return TRUE.  If no such database,
+ * return FALSE.
+ */
 static bool
-get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
+get_db_info(const char *name, LOCKMODE lockmode,
+                       Oid *dbIdP, Oid *ownerIdP,
                        int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
                        Oid *dbLastSysOidP,
-                       TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
+                       TransactionId *dbVacuumXidP, TransactionId *dbMinXidP,
                        Oid *dbTablespace)
 {
+       bool            result = false;
        Relation        relation;
-       ScanKeyData scanKey;
-       SysScanDesc scan;
-       HeapTuple       tuple;
-       bool            gottuple;
 
        AssertArg(name);
 
        /* Caller may wish to grab a better lock on pg_database beforehand... */
        relation = heap_open(DatabaseRelationId, AccessShareLock);
 
-       ScanKeyInit(&scanKey,
-                               Anum_pg_database_datname,
-                               BTEqualStrategyNumber, F_NAMEEQ,
-                               NameGetDatum(name));
+       /*
+        * Loop covers the rare case where the database is renamed before we
+        * can lock it.  We try again just in case we can find a new one of
+        * the same name.
+        */
+       for (;;)
+       {
+               ScanKeyData scanKey;
+               SysScanDesc scan;
+               HeapTuple       tuple;
+               Oid                     dbOid;
 
-       scan = systable_beginscan(relation, DatabaseNameIndexId, true,
-                                                         SnapshotNow, 1, &scanKey);
+               /*
+                * there's no syscache for database-indexed-by-name,
+                * so must do it the hard way
+                */
+               ScanKeyInit(&scanKey,
+                                       Anum_pg_database_datname,
+                                       BTEqualStrategyNumber, F_NAMEEQ,
+                                       NameGetDatum(name));
 
-       tuple = systable_getnext(scan);
+               scan = systable_beginscan(relation, DatabaseNameIndexId, true,
+                                                                 SnapshotNow, 1, &scanKey);
 
-       gottuple = HeapTupleIsValid(tuple);
-       if (gottuple)
-       {
-               Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
-
-               /* oid of the database */
-               if (dbIdP)
-                       *dbIdP = HeapTupleGetOid(tuple);
-               /* sysid of the owner */
-               if (ownerIdP)
-                       *ownerIdP = dbform->datdba;
-               /* character encoding */
-               if (encodingP)
-                       *encodingP = dbform->encoding;
-               /* allowed as template? */
-               if (dbIsTemplateP)
-                       *dbIsTemplateP = dbform->datistemplate;
-               /* allowing connections? */
-               if (dbAllowConnP)
-                       *dbAllowConnP = dbform->datallowconn;
-               /* last system OID used in database */
-               if (dbLastSysOidP)
-                       *dbLastSysOidP = dbform->datlastsysoid;
-               /* limit of vacuumed XIDs */
-               if (dbVacuumXidP)
-                       *dbVacuumXidP = dbform->datvacuumxid;
-               /* limit of frozen XIDs */
-               if (dbFrozenXidP)
-                       *dbFrozenXidP = dbform->datfrozenxid;
-               /* default tablespace for this database */
-               if (dbTablespace)
-                       *dbTablespace = dbform->dattablespace;
+               tuple = systable_getnext(scan);
+
+               if (!HeapTupleIsValid(tuple))
+               {
+                       /* definitely no database of that name */
+                       systable_endscan(scan);
+                       break;
+               }
+
+               dbOid = HeapTupleGetOid(tuple);
+
+               systable_endscan(scan);
+
+               /*
+                * Now that we have a database OID, we can try to lock the DB.
+                */
+               if (lockmode != NoLock)
+                       LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
+
+               /*
+                * And now, re-fetch the tuple by OID.  If it's still there and
+                * still the same name, we win; else, drop the lock and loop
+                * back to try again.
+                */
+               tuple = SearchSysCache(DATABASEOID,
+                                                          ObjectIdGetDatum(dbOid),
+                                                          0, 0, 0);
+               if (HeapTupleIsValid(tuple))
+               {
+                       Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
+
+                       if (strcmp(name, NameStr(dbform->datname)) == 0)
+                       {
+                               /* oid of the database */
+                               if (dbIdP)
+                                       *dbIdP = dbOid;
+                               /* oid of the owner */
+                               if (ownerIdP)
+                                       *ownerIdP = dbform->datdba;
+                               /* character encoding */
+                               if (encodingP)
+                                       *encodingP = dbform->encoding;
+                               /* allowed as template? */
+                               if (dbIsTemplateP)
+                                       *dbIsTemplateP = dbform->datistemplate;
+                               /* allowing connections? */
+                               if (dbAllowConnP)
+                                       *dbAllowConnP = dbform->datallowconn;
+                               /* last system OID used in database */
+                               if (dbLastSysOidP)
+                                       *dbLastSysOidP = dbform->datlastsysoid;
+                               /* limit of vacuumed XIDs */
+                               if (dbVacuumXidP)
+                                       *dbVacuumXidP = dbform->datvacuumxid;
+                               /* limit of min XIDs */
+                               if (dbMinXidP)
+                                       *dbMinXidP = dbform->datminxid;
+                               /* default tablespace for this database */
+                               if (dbTablespace)
+                                       *dbTablespace = dbform->dattablespace;
+                               ReleaseSysCache(tuple);
+                               result = true;
+                               break;
+                       }
+                       /* can only get here if it was just renamed */
+                       ReleaseSysCache(tuple);
+               }
+
+               if (lockmode != NoLock)
+                       UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
        }
 
-       systable_endscan(scan);
        heap_close(relation, AccessShareLock);
 
-       return gottuple;
+       return result;
 }
 
 /* Check if current user has createdb privileges */
@@ -1016,12 +1175,16 @@ have_createdb_privilege(void)
        bool            result = false;
        HeapTuple       utup;
 
-       utup = SearchSysCache(SHADOWSYSID,
-                                                 Int32GetDatum(GetUserId()),
+       /* Superusers can always do everything */
+       if (superuser())
+               return true;
+
+       utup = SearchSysCache(AUTHOID,
+                                                 ObjectIdGetDatum(GetUserId()),
                                                  0, 0, 0);
        if (HeapTupleIsValid(utup))
        {
-               result = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
+               result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
                ReleaseSysCache(utup);
        }
        return result;
@@ -1094,8 +1257,6 @@ remove_dbtablespaces(Oid db_id)
  * get_database_oid - given a database name, look up the OID
  *
  * Returns InvalidOid if database name not found.
- *
- * This is not actually used in this file, but is exported for use elsewhere.
  */
 Oid
 get_database_oid(const char *dbname)
@@ -1106,7 +1267,10 @@ get_database_oid(const char *dbname)
        HeapTuple       dbtuple;
        Oid                     oid;
 
-       /* There's no syscache for pg_database, so must look the hard way */
+       /*
+        * There's no syscache for pg_database indexed by name,
+        * so we must look the hard way.
+        */
        pg_database = heap_open(DatabaseRelationId, AccessShareLock);
        ScanKeyInit(&entry[0],
                                Anum_pg_database_datname,
@@ -1134,38 +1298,24 @@ get_database_oid(const char *dbname)
  * get_database_name - given a database OID, look up the name
  *
  * Returns a palloc'd string, or NULL if no such database.
- *
- * This is not actually used in this file, but is exported for use elsewhere.
  */
 char *
 get_database_name(Oid dbid)
 {
-       Relation        pg_database;
-       ScanKeyData entry[1];
-       SysScanDesc scan;
        HeapTuple       dbtuple;
        char       *result;
 
-       /* There's no syscache for pg_database, so must look the hard way */
-       pg_database = heap_open(DatabaseRelationId, AccessShareLock);
-       ScanKeyInit(&entry[0],
-                               ObjectIdAttributeNumber,
-                               BTEqualStrategyNumber, F_OIDEQ,
-                               ObjectIdGetDatum(dbid));
-       scan = systable_beginscan(pg_database, DatabaseOidIndexId, true,
-                                                         SnapshotNow, 1, entry);
-
-       dbtuple = systable_getnext(scan);
-
-       /* We assume that there can be at most one matching tuple */
+       dbtuple = SearchSysCache(DATABASEOID,
+                                                        ObjectIdGetDatum(dbid),
+                                                        0, 0, 0);
        if (HeapTupleIsValid(dbtuple))
+       {
                result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
+               ReleaseSysCache(dbtuple);
+       }
        else
                result = NULL;
 
-       systable_endscan(scan);
-       heap_close(pg_database, AccessShareLock);
-
        return result;
 }
 
@@ -1184,25 +1334,20 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record)
                char       *dst_path;
                struct stat st;
 
-#ifndef WIN32
-               char            buf[2 * MAXPGPATH + 100];
-#endif
-
                src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
                dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
 
                /*
-                * Our theory for replaying a CREATE is to forcibly drop the
-                * target subdirectory if present, then re-copy the source data.
-                * This may be more work than needed, but it is simple to
-                * implement.
+                * Our theory for replaying a CREATE is to forcibly drop the target
+                * subdirectory if present, then re-copy the source data. This may be
+                * more work than needed, but it is simple to implement.
                 */
                if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
                {
                        if (!rmtree(dst_path, true))
                                ereport(WARNING,
-                                       (errmsg("could not remove database directory \"%s\"",
-                                                       dst_path)));
+                                               (errmsg("could not remove database directory \"%s\"",
+                                                               dst_path)));
                }
 
                /*
@@ -1212,32 +1357,12 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record)
                 */
                BufferSync();
 
-#ifndef WIN32
-
                /*
                 * Copy this subdirectory to the new location
                 *
-                * XXX use of cp really makes this code pretty grotty, particularly
-                * with respect to lack of ability to report errors well.  Someday
-                * rewrite to do it for ourselves.
+                * We don't need to copy subdirectories
                 */
-
-               /* We might need to use cp -R one day for portability */
-               snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
-                                src_path, dst_path);
-               if (system(buf) != 0)
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory"),
-                                        errdetail("Failing system command was: %s", buf),
-                                        errhint("Look in the postmaster's stderr log for more information.")));
-#else                                                  /* WIN32 */
-               if (copydir(src_path, dst_path) != 0)
-               {
-                       /* copydir should already have given details of its troubles */
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory")));
-               }
-#endif   /* WIN32 */
+               copydir(src_path, dst_path, false);
        }
        else if (info == XLOG_DBASE_DROP)
        {
@@ -1246,96 +1371,27 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record)
 
                dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
 
-               /*
-                * Drop pages for this database that are in the shared buffer
-                * cache
-                */
-               DropBuffers(xlrec->db_id);
-
-               if (!rmtree(dst_path, true))
-                       ereport(WARNING,
-                                       (errmsg("could not remove database directory \"%s\"",
-                                                       dst_path)));
-       }
-       else if (info == XLOG_DBASE_CREATE_OLD)
-       {
-               xl_dbase_create_rec_old *xlrec = (xl_dbase_create_rec_old *) XLogRecGetData(record);
-               char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
-               struct stat st;
-
-#ifndef WIN32
-               char            buf[2 * MAXPGPATH + 100];
-#endif
+               /* Drop pages for this database that are in the shared buffer cache */
+               DropDatabaseBuffers(xlrec->db_id);
 
-               /*
-                * Our theory for replaying a CREATE is to forcibly drop the
-                * target subdirectory if present, then re-copy the source data.
-                * This may be more work than needed, but it is simple to
-                * implement.
-                */
-               if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
-               {
-                       if (!rmtree(dst_path, true))
-                               ereport(WARNING,
-                                       (errmsg("could not remove database directory \"%s\"",
-                                                       dst_path)));
-               }
+               /* Also, clean out any entries in the shared free space map */
+               FreeSpaceMapForgetDatabase(xlrec->db_id);
 
-               /*
-                * Force dirty buffers out to disk, to ensure source database is
-                * up-to-date for the copy.  (We really only need to flush buffers for
-                * the source database, but bufmgr.c provides no API for that.)
-                */
-               BufferSync();
+               /* Clean out the xlog relcache too */
+               XLogDropDatabase(xlrec->db_id);
 
-#ifndef WIN32
-
-               /*
-                * Copy this subdirectory to the new location
-                *
-                * XXX use of cp really makes this code pretty grotty, particularly
-                * with respect to lack of ability to report errors well.  Someday
-                * rewrite to do it for ourselves.
-                */
-
-               /* We might need to use cp -R one day for portability */
-               snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
-                                xlrec->src_path, dst_path);
-               if (system(buf) != 0)
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory"),
-                                        errdetail("Failing system command was: %s", buf),
-                                        errhint("Look in the postmaster's stderr log for more information.")));
-#else                                                  /* WIN32 */
-               if (copydir(xlrec->src_path, dst_path) != 0)
-               {
-                       /* copydir should already have given details of its troubles */
-                       ereport(ERROR,
-                                       (errmsg("could not initialize database directory")));
-               }
-#endif   /* WIN32 */
-       }
-       else if (info == XLOG_DBASE_DROP_OLD)
-       {
-               xl_dbase_drop_rec_old *xlrec = (xl_dbase_drop_rec_old *) XLogRecGetData(record);
-
-               /*
-                * Drop pages for this database that are in the shared buffer
-                * cache
-                */
-               DropBuffers(xlrec->db_id);
-
-               if (!rmtree(xlrec->dir_path, true))
+               /* And remove the physical files */
+               if (!rmtree(dst_path, true))
                        ereport(WARNING,
                                        (errmsg("could not remove database directory \"%s\"",
-                                                       xlrec->dir_path)));
+                                                       dst_path)));
        }
        else
                elog(PANIC, "dbase_redo: unknown op code %u", info);
 }
 
 void
-dbase_desc(char *buf, uint8 xl_info, char *rec)
+dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
 {
        uint8           info = xl_info & ~XLR_INFO_MASK;
 
@@ -1343,7 +1399,7 @@ dbase_desc(char *buf, uint8 xl_info, char *rec)
        {
                xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
 
-               sprintf(buf + strlen(buf), "create db: copy dir %u/%u to %u/%u",
+               appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
                                xlrec->src_db_id, xlrec->src_tablespace_id,
                                xlrec->db_id, xlrec->tablespace_id);
        }
@@ -1351,24 +1407,9 @@ dbase_desc(char *buf, uint8 xl_info, char *rec)
        {
                xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
 
-               sprintf(buf + strlen(buf), "drop db: dir %u/%u",
+               appendStringInfo(buf, "drop db: dir %u/%u",
                                xlrec->db_id, xlrec->tablespace_id);
        }
-       else if (info == XLOG_DBASE_CREATE_OLD)
-       {
-               xl_dbase_create_rec_old *xlrec = (xl_dbase_create_rec_old *) rec;
-               char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
-
-               sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
-                               xlrec->db_id, xlrec->src_path, dst_path);
-       }
-       else if (info == XLOG_DBASE_DROP_OLD)
-       {
-               xl_dbase_drop_rec_old *xlrec = (xl_dbase_drop_rec_old *) rec;
-
-               sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
-                               xlrec->db_id, xlrec->dir_path);
-       }
        else
-               strcat(buf, "UNKNOWN");
+               appendStringInfo(buf, "UNKNOWN");
 }