* dbcommands.c
* Database management commands (create/drop database).
*
+ * Note: database creation/destruction commands use exclusive locks on
+ * the database objects (as expressed by LockSharedObject()) to avoid
+ * stepping on each others' toes. Formerly we used table-level locks
+ * on pg_database, but that's too coarse-grained.
*
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.114 2003/05/07 03:47:08 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.182 2006/07/10 16:20:50 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
-#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
+#include "access/genam.h"
#include "access/heapam.h"
-#include "catalog/catname.h"
#include "catalog/catalog.h"
-#include "catalog/pg_database.h"
-#include "catalog/pg_shadow.h"
+#include "catalog/dependency.h"
#include "catalog/indexing.h"
+#include "catalog/pg_authid.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_tablespace.h"
#include "commands/comment.h"
#include "commands/dbcommands.h"
+#include "commands/tablespace.h"
+#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "postmaster/bgwriter.h"
+#include "storage/fd.h"
#include "storage/freespace.h"
-#include "storage/sinval.h"
+#include "storage/procarray.h"
+#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
+#include "utils/flatfiles.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
-#include "mb/pg_wchar.h" /* encoding check */
-
/* non-export function prototypes */
-static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
- int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
- TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
- char *dbpath);
+static bool get_db_info(const char *name, LOCKMODE lockmode,
+ Oid *dbIdP, Oid *ownerIdP,
+ int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
+ Oid *dbLastSysOidP,
+ TransactionId *dbVacuumXidP, TransactionId *dbMinXidP,
+ Oid *dbTablespace);
static bool have_createdb_privilege(void);
-static char *resolve_alt_dbpath(const char *dbpath, Oid dboid);
-static bool remove_dbdirs(const char *real_loc, const char *altloc);
+static void remove_dbtablespaces(Oid db_id);
+
/*
* CREATE DATABASE
*/
-
void
createdb(const CreatedbStmt *stmt)
{
- char *nominal_loc;
- char *alt_loc;
- char *target_dir;
- char src_loc[MAXPGPATH];
- char buf[2 * MAXPGPATH + 100];
+ HeapScanDesc scan;
+ Relation rel;
Oid src_dboid;
- AclId src_owner;
+ Oid src_owner;
int src_encoding;
bool src_istemplate;
+ bool src_allowconn;
Oid src_lastsysoid;
TransactionId src_vacuumxid;
- TransactionId src_frozenxid;
- char src_dbpath[MAXPGPATH];
+ TransactionId src_minxid;
+ Oid src_deftablespace;
+ volatile Oid dst_deftablespace;
Relation pg_database_rel;
HeapTuple tuple;
- TupleDesc pg_database_dsc;
Datum new_record[Natts_pg_database];
char new_record_nulls[Natts_pg_database];
Oid dboid;
- AclId datdba;
- List *option;
+ Oid datdba;
+ ListCell *option;
+ DefElem *dtablespacename = NULL;
DefElem *downer = NULL;
- DefElem *dpath = NULL;
DefElem *dtemplate = NULL;
DefElem *dencoding = NULL;
+ DefElem *dconnlimit = NULL;
char *dbname = stmt->dbname;
char *dbowner = NULL;
- char *dbpath = NULL;
- char *dbtemplate = NULL;
+ const char *dbtemplate = NULL;
int encoding = -1;
+ int dbconnlimit = -1;
+
+ /* don't call this in a transaction block */
+ PreventTransactionChain((void *) stmt, "CREATE DATABASE");
/* Extract options from the statement node tree */
foreach(option, stmt->options)
{
DefElem *defel = (DefElem *) lfirst(option);
- if (strcmp(defel->defname, "owner") == 0)
+ if (strcmp(defel->defname, "tablespace") == 0)
{
- if (downer)
- elog(ERROR, "CREATE DATABASE: conflicting options");
- downer = defel;
+ if (dtablespacename)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ dtablespacename = defel;
}
- else if (strcmp(defel->defname, "location") == 0)
+ else if (strcmp(defel->defname, "owner") == 0)
{
- if (dpath)
- elog(ERROR, "CREATE DATABASE: conflicting options");
- dpath = defel;
+ if (downer)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ downer = defel;
}
else if (strcmp(defel->defname, "template") == 0)
{
if (dtemplate)
- elog(ERROR, "CREATE DATABASE: conflicting options");
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
dtemplate = defel;
}
else if (strcmp(defel->defname, "encoding") == 0)
{
if (dencoding)
- elog(ERROR, "CREATE DATABASE: conflicting options");
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
dencoding = defel;
}
+ else if (strcmp(defel->defname, "connectionlimit") == 0)
+ {
+ if (dconnlimit)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ dconnlimit = defel;
+ }
+ else if (strcmp(defel->defname, "location") == 0)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("LOCATION is not supported anymore"),
+ errhint("Consider using tablespaces instead.")));
+ }
else
- elog(ERROR, "CREATE DATABASE: option \"%s\" not recognized",
+ elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
if (downer && downer->arg)
dbowner = strVal(downer->arg);
- if (dpath && dpath->arg)
- dbpath = strVal(dpath->arg);
if (dtemplate && dtemplate->arg)
dbtemplate = strVal(dtemplate->arg);
if (dencoding && dencoding->arg)
encoding_name = pg_encoding_to_char(encoding);
if (strcmp(encoding_name, "") == 0 ||
pg_valid_server_encoding(encoding_name) < 0)
- elog(ERROR, "%d is not a valid encoding code", encoding);
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("%d is not a valid encoding code",
+ encoding)));
}
else if (IsA(dencoding->arg, String))
{
encoding_name = strVal(dencoding->arg);
if (pg_valid_server_encoding(encoding_name) < 0)
- elog(ERROR, "%s is not a valid encoding name", encoding_name);
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("%s is not a valid encoding name",
+ encoding_name)));
encoding = pg_char_to_encoding(encoding_name);
}
else
- elog(ERROR, "CREATE DATABASE: bogus encoding parameter");
+ elog(ERROR, "unrecognized node type: %d",
+ nodeTag(dencoding->arg));
}
+ if (dconnlimit && dconnlimit->arg)
+ dbconnlimit = intVal(dconnlimit->arg);
- /* obtain sysid of proposed owner */
+ /* obtain OID of proposed owner */
if (dbowner)
- datdba = get_usesysid(dbowner); /* will elog if no such user */
+ datdba = get_roleid_checked(dbowner);
else
datdba = GetUserId();
- if (datdba == GetUserId())
- {
- /* creating database for self: can be superuser or createdb */
- if (!superuser() && !have_createdb_privilege())
- elog(ERROR, "CREATE DATABASE: permission denied");
- }
- else
- {
- /* creating database for someone else: must be superuser */
- /* note that the someone else need not have any permissions */
- if (!superuser())
- elog(ERROR, "CREATE DATABASE: permission denied");
- }
-
- /* don't call this in a transaction block */
- PreventTransactionChain((void *) stmt, "CREATE DATABASE");
-
-#ifndef HAVE_SYMLINK
- if (dbpath != NULL)
- elog(ERROR, "CREATE DATABASE: may not use an alternate location on this platform");
-#endif
-
/*
- * Check for db name conflict. There is a race condition here, since
- * another backend could create the same DB name before we commit.
- * However, holding an exclusive lock on pg_database for the whole
- * time we are copying the source database doesn't seem like a good
- * idea, so accept possibility of race to create. We will check again
- * after we grab the exclusive lock.
+ * To create a database, must have createdb privilege and must be able to
+ * become the target role (this does not imply that the target role itself
+ * must have createdb privilege). The latter provision guards against
+ * "giveaway" attacks. Note that a superuser will always have both of
+ * these privileges a fortiori.
*/
- if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
- elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
+ if (!have_createdb_privilege())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to create database")));
+
+ check_is_member_of_role(GetUserId(), datdba);
/*
- * Lookup database (template) to be cloned.
+ * Lookup database (template) to be cloned, and obtain share lock on it.
+ * ShareLock allows two CREATE DATABASEs to work from the same template
+ * concurrently, while ensuring no one is busy dropping it in parallel
+ * (which would be Very Bad since we'd likely get an incomplete copy
+ * without knowing it). This also prevents any new connections from being
+ * made to the source until we finish copying it, so we can be sure it
+ * won't change underneath us.
*/
if (!dbtemplate)
dbtemplate = "template1"; /* Default template database name */
- if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
- &src_istemplate, &src_lastsysoid,
- &src_vacuumxid, &src_frozenxid,
- src_dbpath))
- elog(ERROR, "CREATE DATABASE: template \"%s\" does not exist",
- dbtemplate);
+ if (!get_db_info(dbtemplate, ShareLock,
+ &src_dboid, &src_owner, &src_encoding,
+ &src_istemplate, &src_allowconn, &src_lastsysoid,
+ &src_vacuumxid, &src_minxid, &src_deftablespace))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("template database \"%s\" does not exist",
+ dbtemplate)));
/*
* Permission check: to copy a DB that's not marked datistemplate, you
*/
if (!src_istemplate)
{
- if (!superuser() && GetUserId() != src_owner)
- elog(ERROR, "CREATE DATABASE: permission to copy \"%s\" denied",
- dbtemplate);
+ if (!pg_database_ownercheck(src_dboid, GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to copy database \"%s\"",
+ dbtemplate)));
}
- /*
- * Determine physical path of source database
- */
- alt_loc = resolve_alt_dbpath(src_dbpath, src_dboid);
- if (!alt_loc)
- alt_loc = GetDatabasePath(src_dboid);
- strcpy(src_loc, alt_loc);
-
/*
* The source DB can't have any active backends, except this one
* (exception is to allow CREATE DB while connected to template1).
- * Otherwise we might copy inconsistent data. This check is not
- * bulletproof, since someone might connect while we are copying...
+ * Otherwise we might copy inconsistent data.
*/
if (DatabaseHasActiveBackends(src_dboid, true))
- elog(ERROR, "CREATE DATABASE: source database \"%s\" is being accessed by other users", dbtemplate);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("source database \"%s\" is being accessed by other users",
+ dbtemplate)));
/* If encoding is defaulted, use source's encoding */
if (encoding < 0)
/* Some encodings are client only */
if (!PG_VALID_BE_ENCODING(encoding))
- elog(ERROR, "CREATE DATABASE: invalid backend encoding");
-
- /*
- * Preassign OID for pg_database tuple, so that we can compute db
- * path.
- */
- dboid = newoid();
-
- /*
- * Compute nominal location (where we will try to access the
- * database), and resolve alternate physical location if one is
- * specified.
- *
- * If an alternate location is specified but is the same as the normal
- * path, just drop the alternate-location spec (this seems friendlier
- * than erroring out). We must test this case to avoid creating a
- * circular symlink below.
- */
- nominal_loc = GetDatabasePath(dboid);
- alt_loc = resolve_alt_dbpath(dbpath, dboid);
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("invalid server encoding %d", encoding)));
- if (alt_loc && strcmp(alt_loc, nominal_loc) == 0)
+ /* Resolve default tablespace for new database */
+ if (dtablespacename && dtablespacename->arg)
{
- alt_loc = NULL;
- dbpath = NULL;
- }
-
- if (strchr(nominal_loc, '\''))
- elog(ERROR, "database path may not contain single quotes");
- if (alt_loc && strchr(alt_loc, '\''))
- elog(ERROR, "database path may not contain single quotes");
- if (strchr(src_loc, '\''))
- elog(ERROR, "database path may not contain single quotes");
- /* ... otherwise we'd be open to shell exploits below */
-
- /*
- * Force dirty buffers out to disk, to ensure source database is
- * up-to-date for the copy. (We really only need to flush buffers for
- * the source database...)
- */
- BufferSync();
+ char *tablespacename;
+ AclResult aclresult;
+
+ tablespacename = strVal(dtablespacename->arg);
+ dst_deftablespace = get_tablespace_oid(tablespacename);
+ if (!OidIsValid(dst_deftablespace))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("tablespace \"%s\" does not exist",
+ tablespacename)));
+ /* check permissions */
+ aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
+ ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
+ tablespacename);
- /*
- * Close virtual file descriptors so the kernel has more available for
- * the mkdir() and system() calls below.
- */
- closeAllVfds();
-
- /*
- * Check we can create the target directory --- but then remove it
- * because we rely on cp(1) to create it for real.
- */
- target_dir = alt_loc ? alt_loc : nominal_loc;
-
- if (mkdir(target_dir, S_IRWXU) != 0)
- elog(ERROR, "CREATE DATABASE: unable to create database directory '%s': %m",
- target_dir);
- if (rmdir(target_dir) != 0)
- elog(ERROR, "CREATE DATABASE: unable to remove temp directory '%s': %m",
- target_dir);
-
- /* Make the symlink, if needed */
- if (alt_loc)
- {
-#ifdef HAVE_SYMLINK /* already throws error above */
- if (symlink(alt_loc, nominal_loc) != 0)
-#endif
- elog(ERROR, "CREATE DATABASE: could not link '%s' to '%s': %m",
- nominal_loc, alt_loc);
+ /*
+ * If we are trying to change the default tablespace of the template,
+ * we require that the template not have any files in the new default
+ * tablespace. This is necessary because otherwise the copied
+ * database would contain pg_class rows that refer to its default
+ * tablespace both explicitly (by OID) and implicitly (as zero), which
+ * would cause problems. For example another CREATE DATABASE using
+ * the copied database as template, and trying to change its default
+ * tablespace again, would yield outright incorrect results (it would
+ * improperly move tables to the new default tablespace that should
+ * stay in the same tablespace).
+ */
+ if (dst_deftablespace != src_deftablespace)
+ {
+ char *srcpath;
+ struct stat st;
+
+ srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
+
+ if (stat(srcpath, &st) == 0 &&
+ S_ISDIR(st.st_mode) &&
+ !directory_is_empty(srcpath))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot assign new default tablespace \"%s\"",
+ tablespacename),
+ errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
+ dbtemplate)));
+ pfree(srcpath);
+ }
}
-
- /* Copy the template database to the new location */
-#ifndef WIN32
- snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir);
-#else
- snprintf(buf, sizeof(buf), "xcopy /e /i /q '%s' '%s'", src_loc, target_dir);
-#endif
-
- if (system(buf) != 0)
+ else
{
- if (remove_dbdirs(nominal_loc, alt_loc))
- elog(ERROR, "CREATE DATABASE: could not initialize database directory");
- else
- elog(ERROR, "CREATE DATABASE: could not initialize database directory; delete failed as well");
+ /* Use template database's default tablespace */
+ dst_deftablespace = src_deftablespace;
+ /* Note there is no additional permission check in this path */
}
/*
- * Now OK to grab exclusive lock on pg_database.
+ * Check for db name conflict. This is just to give a more friendly
+ * error message than "unique index violation". There's a race condition
+ * but we're willing to accept the less friendly message in that case.
*/
- pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
-
- /* Check to see if someone else created same DB name meanwhile. */
- if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
- {
- /* Don't hold lock while doing recursive remove */
- heap_close(pg_database_rel, AccessExclusiveLock);
- remove_dbdirs(nominal_loc, alt_loc);
- elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
- }
+ if (OidIsValid(get_database_oid(dbname)))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_DATABASE),
+ errmsg("database \"%s\" already exists", dbname)));
/*
- * Insert a new tuple into pg_database
+ * Insert a new tuple into pg_database. This establishes our ownership
+ * of the new database name (anyone else trying to insert the same name
+ * will block on the unique index, and fail after we commit). It also
+ * assigns the OID that the new database will have.
*/
- pg_database_dsc = RelationGetDescr(pg_database_rel);
+ pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
/* Form tuple */
MemSet(new_record, 0, sizeof(new_record));
new_record[Anum_pg_database_datname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(dbname));
- new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
+ new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
+ new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
- new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
- /* do not set datpath to null, GetRawDatabaseInfo won't cope */
- new_record[Anum_pg_database_datpath - 1] =
- DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : ""));
+ new_record[Anum_pg_database_datminxid - 1] = TransactionIdGetDatum(src_minxid);
+ new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
/*
* We deliberately set datconfig and datacl to defaults (NULL), rather
new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
- tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
+ tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
+ new_record, new_record_nulls);
- HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID
- * selection */
-
- simple_heap_insert(pg_database_rel, tuple);
+ dboid = simple_heap_insert(pg_database_rel, tuple);
/* Update indexes */
CatalogUpdateIndexes(pg_database_rel, tuple);
- /* Close pg_database, but keep lock till commit */
- heap_close(pg_database_rel, NoLock);
+ /*
+ * Now generate additional catalog entries associated with the new DB
+ */
+
+ /* Register owner dependency */
+ recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
+
+ /* Create pg_shdepend entries for objects within database */
+ copyTemplateDependencies(src_dboid, dboid);
/*
- * Force dirty buffers out to disk, so that newly-connecting backends
- * will see the new database in pg_database right away. (They'll see
- * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
+ * Force dirty buffers out to disk, to ensure source database is
+ * up-to-date for the copy. (We really only need to flush buffers for the
+ * source database, but bufmgr.c provides no API for that.)
*/
BufferSync();
+
+ /*
+ * Once we start copying subdirectories, we need to be able to clean 'em
+ * up if we fail. Establish a TRY block to make sure this happens. (This
+ * is not a 100% solution, because of the possibility of failure during
+ * transaction commit after we leave this routine, but it should handle
+ * most scenarios.)
+ */
+ PG_TRY();
+ {
+ /*
+ * Iterate through all tablespaces of the template database, and copy
+ * each one to the new database.
+ */
+ rel = heap_open(TableSpaceRelationId, AccessShareLock);
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Oid srctablespace = HeapTupleGetOid(tuple);
+ Oid dsttablespace;
+ char *srcpath;
+ char *dstpath;
+ struct stat st;
+
+ /* No need to copy global tablespace */
+ if (srctablespace == GLOBALTABLESPACE_OID)
+ continue;
+
+ srcpath = GetDatabasePath(src_dboid, srctablespace);
+
+ if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+ directory_is_empty(srcpath))
+ {
+ /* Assume we can ignore it */
+ pfree(srcpath);
+ continue;
+ }
+
+ if (srctablespace == src_deftablespace)
+ dsttablespace = dst_deftablespace;
+ else
+ dsttablespace = srctablespace;
+
+ dstpath = GetDatabasePath(dboid, dsttablespace);
+
+ /*
+ * Copy this subdirectory to the new location
+ *
+ * We don't need to copy subdirectories
+ */
+ copydir(srcpath, dstpath, false);
+
+ /* Record the filesystem change in XLOG */
+ {
+ xl_dbase_create_rec xlrec;
+ XLogRecData rdata[1];
+
+ xlrec.db_id = dboid;
+ xlrec.tablespace_id = dsttablespace;
+ xlrec.src_db_id = src_dboid;
+ xlrec.src_tablespace_id = srctablespace;
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof(xl_dbase_create_rec);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = NULL;
+
+ (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
+ }
+ }
+ heap_endscan(scan);
+ heap_close(rel, AccessShareLock);
+
+ /*
+ * We force a checkpoint before committing. This effectively means
+ * that committed XLOG_DBASE_CREATE operations will never need to be
+ * replayed (at least not in ordinary crash recovery; we still have to
+ * make the XLOG entry for the benefit of PITR operations). This
+ * avoids two nasty scenarios:
+ *
+ * #1: When PITR is off, we don't XLOG the contents of newly created
+ * indexes; therefore the drop-and-recreate-whole-directory behavior
+ * of DBASE_CREATE replay would lose such indexes.
+ *
+ * #2: Since we have to recopy the source database during DBASE_CREATE
+ * replay, we run the risk of copying changes in it that were
+ * committed after the original CREATE DATABASE command but before the
+ * system crash that led to the replay. This is at least unexpected
+ * and at worst could lead to inconsistencies, eg duplicate table
+ * names.
+ *
+ * (Both of these were real bugs in releases 8.0 through 8.0.3.)
+ *
+ * In PITR replay, the first of these isn't an issue, and the second
+ * is only a risk if the CREATE DATABASE and subsequent template
+ * database change both occur while a base backup is being taken.
+ * There doesn't seem to be much we can do about that except document
+ * it as a limitation.
+ *
+ * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
+ * we can avoid this.
+ */
+ RequestCheckpoint(true, false);
+
+ /*
+ * Close pg_database, but keep lock till commit (this is important
+ * to prevent any risk of deadlock failure while updating flat file)
+ */
+ heap_close(pg_database_rel, NoLock);
+
+ /*
+ * Set flag to update flat database file at commit.
+ */
+ database_file_update_needed();
+ }
+ PG_CATCH();
+ {
+ /* Release lock on source database before doing recursive remove */
+ UnlockSharedObject(DatabaseRelationId, src_dboid, 0,
+ ShareLock);
+
+ /* Throw away any successfully copied subdirectories */
+ remove_dbtablespaces(dboid);
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
}
* DROP DATABASE
*/
void
-dropdb(const char *dbname)
+dropdb(const char *dbname, bool missing_ok)
{
- int4 db_owner;
- bool db_istemplate;
Oid db_id;
- char *alt_loc;
- char *nominal_loc;
- char dbpath[MAXPGPATH];
+ bool db_istemplate;
Relation pgdbrel;
- HeapScanDesc pgdbscan;
- ScanKeyData key;
HeapTuple tup;
- AssertArg(dbname);
+ PreventTransactionChain((void *) dbname, "DROP DATABASE");
- if (strcmp(dbname, DatabaseName) == 0)
- elog(ERROR, "DROP DATABASE: cannot be executed on the currently open database");
+ AssertArg(dbname);
- PreventTransactionChain((void *) dbname, "DROP DATABASE");
+ if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("cannot drop the currently open database")));
/*
- * Obtain exclusive lock on pg_database. We need this to ensure that
- * no new backend starts up in the target database while we are
- * deleting it. (Actually, a new backend might still manage to start
- * up, because it will read pg_database without any locking to
- * discover the database's OID. But it will detect its error in
- * ReverifyMyDatabase and shut down before any serious damage is done.
- * See postinit.c.)
+ * Look up the target database's OID, and get exclusive lock on it.
+ * We need this to ensure that no new backend starts up in the target
+ * database while we are deleting it (see postinit.c), and that no one is
+ * using it as a CREATE DATABASE template or trying to delete it for
+ * themselves.
*/
- pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
+ pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
- if (!get_db_info(dbname, &db_id, &db_owner, NULL,
- &db_istemplate, NULL, NULL, NULL, dbpath))
- elog(ERROR, "DROP DATABASE: database \"%s\" does not exist", dbname);
+ if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
+ &db_istemplate, NULL, NULL, NULL, NULL, NULL))
+ {
+ if (!missing_ok)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", dbname)));
+ }
+ else
+ {
+ /* Close pg_database, release the lock, since we changed nothing */
+ heap_close(pgdbrel, RowExclusiveLock);
+ ereport(NOTICE,
+ (errmsg("database \"%s\" does not exist, skipping",
+ dbname)));
+ return;
+ }
+ }
- if (GetUserId() != db_owner && !superuser())
- elog(ERROR, "DROP DATABASE: permission denied");
+ /*
+ * Permission checks
+ */
+ if (!pg_database_ownercheck(db_id, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+ dbname);
/*
* Disallow dropping a DB that is marked istemplate. This is just to
- * prevent people from accidentally dropping template0 or template1;
- * they can do so if they're really determined ...
+ * prevent people from accidentally dropping template0 or template1; they
+ * can do so if they're really determined ...
*/
if (db_istemplate)
- elog(ERROR, "DROP DATABASE: database is marked as a template");
-
- nominal_loc = GetDatabasePath(db_id);
- alt_loc = resolve_alt_dbpath(dbpath, db_id);
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot drop a template database")));
/*
- * Check for active backends in the target database.
+ * Check for active backends in the target database. (Because we hold
+ * the database lock, no new ones can start after this.)
*/
if (DatabaseHasActiveBackends(db_id, false))
- elog(ERROR, "DROP DATABASE: database \"%s\" is being accessed by other users", dbname);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("database \"%s\" is being accessed by other users",
+ dbname)));
/*
- * Find the database's tuple by OID (should be unique).
+ * Remove the database's tuple from pg_database.
*/
- ScanKeyEntryInitialize(&key, 0, ObjectIdAttributeNumber,
- F_OIDEQ, ObjectIdGetDatum(db_id));
-
- pgdbscan = heap_beginscan(pgdbrel, SnapshotNow, 1, &key);
-
- tup = heap_getnext(pgdbscan, ForwardScanDirection);
+ tup = SearchSysCache(DATABASEOID,
+ ObjectIdGetDatum(db_id),
+ 0, 0, 0);
if (!HeapTupleIsValid(tup))
- {
- /*
- * This error should never come up since the existence of the
- * database is checked earlier
- */
- elog(ERROR, "DROP DATABASE: Database \"%s\" doesn't exist despite earlier reports to the contrary",
- dbname);
- }
+ elog(ERROR, "cache lookup failed for database %u", db_id);
- /* Remove the database's tuple from pg_database */
simple_heap_delete(pgdbrel, &tup->t_self);
- heap_endscan(pgdbscan);
+ ReleaseSysCache(tup);
/*
- * Delete any comments associated with the database
- *
- * NOTE: this is probably dead code since any such comments should have
- * been in that database, not mine.
+ * Delete any comments associated with the database.
*/
- DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
+ DeleteSharedComments(db_id, DatabaseRelationId);
/*
- * Close pg_database, but keep exclusive lock till commit to ensure
- * that any new backend scanning pg_database will see the tuple dead.
+ * Remove shared dependency references for the database.
*/
- heap_close(pgdbrel, NoLock);
+ dropDatabaseDependencies(db_id);
/*
- * Drop pages for this database that are in the shared buffer cache.
- * This is important to ensure that no remaining backend tries to
- * write out a dirty buffer to the dead database later...
+ * Drop pages for this database that are in the shared buffer cache. This
+ * is important to ensure that no remaining backend tries to write out a
+ * dirty buffer to the dead database later...
*/
- DropBuffers(db_id);
+ DropDatabaseBuffers(db_id);
/*
* Also, clean out any entries in the shared free space map.
FreeSpaceMapForgetDatabase(db_id);
/*
- * Remove the database's subdirectory and everything in it.
+ * On Windows, force a checkpoint so that the bgwriter doesn't hold any
+ * open files, which would cause rmdir() to fail.
*/
- remove_dbdirs(nominal_loc, alt_loc);
+#ifdef WIN32
+ RequestCheckpoint(true, false);
+#endif
/*
- * Force dirty buffers out to disk, so that newly-connecting backends
- * will see the database tuple marked dead in pg_database right away.
- * (They'll see an uncommitted deletion, but they don't care; see
- * GetRawDatabaseInfo.)
+ * Remove all tablespace subdirs belonging to the database.
*/
- BufferSync();
+ remove_dbtablespaces(db_id);
+
+ /*
+ * Close pg_database, but keep lock till commit (this is important
+ * to prevent any risk of deadlock failure while updating flat file)
+ */
+ heap_close(pgdbrel, NoLock);
+
+ /*
+ * Set flag to update flat database file at commit.
+ */
+ database_file_update_needed();
}
+/*
+ * Rename database
+ */
+void
+RenameDatabase(const char *oldname, const char *newname)
+{
+ Oid db_id;
+ HeapTuple newtup;
+ Relation rel;
+
+ /*
+ * Look up the target database's OID, and get exclusive lock on it.
+ * We need this for the same reasons as DROP DATABASE.
+ */
+ rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+
+ if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
+ NULL, NULL, NULL, NULL, NULL, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", oldname)));
+
+ /*
+ * XXX Client applications probably store the current database somewhere,
+ * so renaming it could cause confusion. On the other hand, there may not
+ * be an actual problem besides a little confusion, so think about this
+ * and decide.
+ */
+ if (db_id == MyDatabaseId)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("current database may not be renamed")));
+
+ /*
+ * Make sure the database does not have active sessions. This is the
+ * same concern as above, but applied to other sessions.
+ */
+ if (DatabaseHasActiveBackends(db_id, false))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("database \"%s\" is being accessed by other users",
+ oldname)));
+
+ /* make sure the new name doesn't exist */
+ if (OidIsValid(get_database_oid(newname)))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_DATABASE),
+ errmsg("database \"%s\" already exists", newname)));
+
+ /* must be owner */
+ if (!pg_database_ownercheck(db_id, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+ oldname);
+
+ /* must have createdb rights */
+ if (!have_createdb_privilege())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to rename database")));
+
+ /* rename */
+ newtup = SearchSysCacheCopy(DATABASEOID,
+ ObjectIdGetDatum(db_id),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(newtup))
+ elog(ERROR, "cache lookup failed for database %u", db_id);
+ namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
+ simple_heap_update(rel, &newtup->t_self, newtup);
+ CatalogUpdateIndexes(rel, newtup);
+
+ /*
+ * Close pg_database, but keep lock till commit (this is important
+ * to prevent any risk of deadlock failure while updating flat file)
+ */
+ heap_close(rel, NoLock);
+
+ /*
+ * Set flag to update flat database file at commit.
+ */
+ database_file_update_needed();
+}
+
+
+/*
+ * ALTER DATABASE name ...
+ */
+void
+AlterDatabase(AlterDatabaseStmt *stmt)
+{
+ Relation rel;
+ HeapTuple tuple,
+ newtuple;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ ListCell *option;
+ int connlimit = -1;
+ DefElem *dconnlimit = NULL;
+ Datum new_record[Natts_pg_database];
+ char new_record_nulls[Natts_pg_database];
+ char new_record_repl[Natts_pg_database];
+
+ /* Extract options from the statement node tree */
+ foreach(option, stmt->options)
+ {
+ DefElem *defel = (DefElem *) lfirst(option);
+
+ if (strcmp(defel->defname, "connectionlimit") == 0)
+ {
+ if (dconnlimit)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ dconnlimit = defel;
+ }
+ else
+ elog(ERROR, "option \"%s\" not recognized",
+ defel->defname);
+ }
+
+ if (dconnlimit)
+ connlimit = intVal(dconnlimit->arg);
+
+ /*
+ * Get the old tuple. We don't need a lock on the database per se,
+ * because we're not going to do anything that would mess up incoming
+ * connections.
+ */
+ rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+ ScanKeyInit(&scankey,
+ Anum_pg_database_datname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ NameGetDatum(stmt->dbname));
+ scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+ SnapshotNow, 1, &scankey);
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", stmt->dbname)));
+
+ if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+ stmt->dbname);
+
+ /*
+ * Build an updated tuple, perusing the information just obtained
+ */
+ MemSet(new_record, 0, sizeof(new_record));
+ MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
+ MemSet(new_record_repl, ' ', sizeof(new_record_repl));
+
+ if (dconnlimit)
+ {
+ new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
+ new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
+ }
+
+ newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
+ new_record_nulls, new_record_repl);
+ simple_heap_update(rel, &tuple->t_self, newtuple);
+
+ /* Update indexes */
+ CatalogUpdateIndexes(rel, newtuple);
+
+ systable_endscan(scan);
+
+ /* Close pg_database, but keep lock till commit */
+ heap_close(rel, NoLock);
+
+ /*
+ * We don't bother updating the flat file since the existing options for
+ * ALTER DATABASE don't affect it.
+ */
+}
+
/*
* ALTER DATABASE name SET ...
newtuple;
Relation rel;
ScanKeyData scankey;
- HeapScanDesc scan;
+ SysScanDesc scan;
Datum repl_val[Natts_pg_database];
char repl_null[Natts_pg_database];
char repl_repl[Natts_pg_database];
valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
- rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
- ScanKeyEntryInitialize(&scankey, 0, Anum_pg_database_datname,
- F_NAMEEQ, NameGetDatum(stmt->dbname));
- scan = heap_beginscan(rel, SnapshotNow, 1, &scankey);
- tuple = heap_getnext(scan, ForwardScanDirection);
+ /*
+ * Get the old tuple. We don't need a lock on the database per se,
+ * because we're not going to do anything that would mess up incoming
+ * connections.
+ */
+ rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+ ScanKeyInit(&scankey,
+ Anum_pg_database_datname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ NameGetDatum(stmt->dbname));
+ scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+ SnapshotNow, 1, &scankey);
+ tuple = systable_getnext(scan);
if (!HeapTupleIsValid(tuple))
- elog(ERROR, "database \"%s\" does not exist", stmt->dbname);
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", stmt->dbname)));
- if (!(superuser()
- || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
- elog(ERROR, "ALTER DATABASE SET: permission denied");
+ if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+ stmt->dbname);
MemSet(repl_repl, ' ', sizeof(repl_repl));
repl_repl[Anum_pg_database_datconfig - 1] = 'r';
datum = heap_getattr(tuple, Anum_pg_database_datconfig,
RelationGetDescr(rel), &isnull);
- a = isnull ? ((ArrayType *) NULL) : DatumGetArrayTypeP(datum);
+ a = isnull ? NULL : DatumGetArrayTypeP(datum);
if (valuestr)
a = GUCArrayAdd(a, stmt->variable, valuestr);
repl_null[Anum_pg_database_datconfig - 1] = 'n';
}
- newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
+ newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
simple_heap_update(rel, &tuple->t_self, newtuple);
/* Update indexes */
CatalogUpdateIndexes(rel, newtuple);
- heap_endscan(scan);
- heap_close(rel, RowExclusiveLock);
+ systable_endscan(scan);
+
+ /* Close pg_database, but keep lock till commit */
+ heap_close(rel, NoLock);
+
+ /*
+ * We don't bother updating the flat file since ALTER DATABASE SET doesn't
+ * affect it.
+ */
}
+/*
+ * ALTER DATABASE name OWNER TO newowner
+ */
+void
+AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
+{
+ HeapTuple tuple;
+ Relation rel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ Form_pg_database datForm;
+
+ /*
+ * Get the old tuple. We don't need a lock on the database per se,
+ * because we're not going to do anything that would mess up incoming
+ * connections.
+ */
+ rel = heap_open(DatabaseRelationId, RowExclusiveLock);
+ ScanKeyInit(&scankey,
+ Anum_pg_database_datname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ NameGetDatum(dbname));
+ scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+ SnapshotNow, 1, &scankey);
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", dbname)));
+
+ datForm = (Form_pg_database) GETSTRUCT(tuple);
+
+ /*
+ * If the new owner is the same as the existing owner, consider the
+ * command to have succeeded. This is to be consistent with other
+ * objects.
+ */
+ if (datForm->datdba != newOwnerId)
+ {
+ Datum repl_val[Natts_pg_database];
+ char repl_null[Natts_pg_database];
+ char repl_repl[Natts_pg_database];
+ Acl *newAcl;
+ Datum aclDatum;
+ bool isNull;
+ HeapTuple newtuple;
+
+ /* Otherwise, must be owner of the existing object */
+ if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
+ dbname);
+
+ /* Must be able to become new owner */
+ check_is_member_of_role(GetUserId(), newOwnerId);
+
+ /*
+ * must have createdb rights
+ *
+ * NOTE: This is different from other alter-owner checks in that the
+ * current user is checked for createdb privileges instead of the
+ * destination owner. This is consistent with the CREATE case for
+ * databases. Because superusers will always have this right, we need
+ * no special case for them.
+ */
+ if (!have_createdb_privilege())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to change owner of database")));
+
+ memset(repl_null, ' ', sizeof(repl_null));
+ memset(repl_repl, ' ', sizeof(repl_repl));
+
+ repl_repl[Anum_pg_database_datdba - 1] = 'r';
+ repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
+
+ /*
+ * Determine the modified ACL for the new owner. This is only
+ * necessary when the ACL is non-null.
+ */
+ aclDatum = heap_getattr(tuple,
+ Anum_pg_database_datacl,
+ RelationGetDescr(rel),
+ &isNull);
+ if (!isNull)
+ {
+ newAcl = aclnewowner(DatumGetAclP(aclDatum),
+ datForm->datdba, newOwnerId);
+ repl_repl[Anum_pg_database_datacl - 1] = 'r';
+ repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
+ }
+
+ newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
+ simple_heap_update(rel, &newtuple->t_self, newtuple);
+ CatalogUpdateIndexes(rel, newtuple);
+
+ heap_freetuple(newtuple);
+
+ /* Update owner dependency reference */
+ changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
+ newOwnerId);
+ }
+
+ systable_endscan(scan);
+
+ /* Close pg_database, but keep lock till commit */
+ heap_close(rel, NoLock);
+
+ /*
+ * We don't bother updating the flat file since ALTER DATABASE OWNER
+ * doesn't affect it.
+ */
+}
+
/*
* Helper functions
*/
+/*
+ * Look up info about the database named "name". If the database exists,
+ * obtain the specified lock type on it, fill in any of the remaining
+ * parameters that aren't NULL, and return TRUE. If no such database,
+ * return FALSE.
+ */
static bool
-get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
- int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
- TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
- char *dbpath)
+get_db_info(const char *name, LOCKMODE lockmode,
+ Oid *dbIdP, Oid *ownerIdP,
+ int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
+ Oid *dbLastSysOidP,
+ TransactionId *dbVacuumXidP, TransactionId *dbMinXidP,
+ Oid *dbTablespace)
{
+ bool result = false;
Relation relation;
- ScanKeyData scanKey;
- HeapScanDesc scan;
- HeapTuple tuple;
- bool gottuple;
AssertArg(name);
/* Caller may wish to grab a better lock on pg_database beforehand... */
- relation = heap_openr(DatabaseRelationName, AccessShareLock);
+ relation = heap_open(DatabaseRelationId, AccessShareLock);
- ScanKeyEntryInitialize(&scanKey, 0, Anum_pg_database_datname,
- F_NAMEEQ, NameGetDatum(name));
+ /*
+ * Loop covers the rare case where the database is renamed before we
+ * can lock it. We try again just in case we can find a new one of
+ * the same name.
+ */
+ for (;;)
+ {
+ ScanKeyData scanKey;
+ SysScanDesc scan;
+ HeapTuple tuple;
+ Oid dbOid;
- scan = heap_beginscan(relation, SnapshotNow, 1, &scanKey);
+ /*
+ * there's no syscache for database-indexed-by-name,
+ * so must do it the hard way
+ */
+ ScanKeyInit(&scanKey,
+ Anum_pg_database_datname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ NameGetDatum(name));
- tuple = heap_getnext(scan, ForwardScanDirection);
+ scan = systable_beginscan(relation, DatabaseNameIndexId, true,
+ SnapshotNow, 1, &scanKey);
- gottuple = HeapTupleIsValid(tuple);
- if (gottuple)
- {
- Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
-
- /* oid of the database */
- if (dbIdP)
- *dbIdP = HeapTupleGetOid(tuple);
- /* sysid of the owner */
- if (ownerIdP)
- *ownerIdP = dbform->datdba;
- /* character encoding */
- if (encodingP)
- *encodingP = dbform->encoding;
- /* allowed as template? */
- if (dbIsTemplateP)
- *dbIsTemplateP = dbform->datistemplate;
- /* last system OID used in database */
- if (dbLastSysOidP)
- *dbLastSysOidP = dbform->datlastsysoid;
- /* limit of vacuumed XIDs */
- if (dbVacuumXidP)
- *dbVacuumXidP = dbform->datvacuumxid;
- /* limit of frozen XIDs */
- if (dbFrozenXidP)
- *dbFrozenXidP = dbform->datfrozenxid;
- /* database path (as registered in pg_database) */
- if (dbpath)
+ tuple = systable_getnext(scan);
+
+ if (!HeapTupleIsValid(tuple))
{
- Datum datum;
- bool isnull;
-
- datum = heap_getattr(tuple,
- Anum_pg_database_datpath,
- RelationGetDescr(relation),
- &isnull);
- if (!isnull)
- {
- text *pathtext = DatumGetTextP(datum);
- int pathlen = VARSIZE(pathtext) - VARHDRSZ;
+ /* definitely no database of that name */
+ systable_endscan(scan);
+ break;
+ }
+
+ dbOid = HeapTupleGetOid(tuple);
+
+ systable_endscan(scan);
+
+ /*
+ * Now that we have a database OID, we can try to lock the DB.
+ */
+ if (lockmode != NoLock)
+ LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
+
+ /*
+ * And now, re-fetch the tuple by OID. If it's still there and
+ * still the same name, we win; else, drop the lock and loop
+ * back to try again.
+ */
+ tuple = SearchSysCache(DATABASEOID,
+ ObjectIdGetDatum(dbOid),
+ 0, 0, 0);
+ if (HeapTupleIsValid(tuple))
+ {
+ Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
- Assert(pathlen >= 0 && pathlen < MAXPGPATH);
- strncpy(dbpath, VARDATA(pathtext), pathlen);
- *(dbpath + pathlen) = '\0';
+ if (strcmp(name, NameStr(dbform->datname)) == 0)
+ {
+ /* oid of the database */
+ if (dbIdP)
+ *dbIdP = dbOid;
+ /* oid of the owner */
+ if (ownerIdP)
+ *ownerIdP = dbform->datdba;
+ /* character encoding */
+ if (encodingP)
+ *encodingP = dbform->encoding;
+ /* allowed as template? */
+ if (dbIsTemplateP)
+ *dbIsTemplateP = dbform->datistemplate;
+ /* allowing connections? */
+ if (dbAllowConnP)
+ *dbAllowConnP = dbform->datallowconn;
+ /* last system OID used in database */
+ if (dbLastSysOidP)
+ *dbLastSysOidP = dbform->datlastsysoid;
+ /* limit of vacuumed XIDs */
+ if (dbVacuumXidP)
+ *dbVacuumXidP = dbform->datvacuumxid;
+ /* limit of min XIDs */
+ if (dbMinXidP)
+ *dbMinXidP = dbform->datminxid;
+ /* default tablespace for this database */
+ if (dbTablespace)
+ *dbTablespace = dbform->dattablespace;
+ ReleaseSysCache(tuple);
+ result = true;
+ break;
}
- else
- strcpy(dbpath, "");
+ /* can only get here if it was just renamed */
+ ReleaseSysCache(tuple);
}
+
+ if (lockmode != NoLock)
+ UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
}
- heap_endscan(scan);
heap_close(relation, AccessShareLock);
- return gottuple;
+ return result;
}
+/* Check if current user has createdb privileges */
static bool
have_createdb_privilege(void)
{
+ bool result = false;
HeapTuple utup;
- bool retval;
- utup = SearchSysCache(SHADOWSYSID,
- Int32GetDatum(GetUserId()),
- 0, 0, 0);
-
- if (!HeapTupleIsValid(utup))
- retval = false;
- else
- retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
+ /* Superusers can always do everything */
+ if (superuser())
+ return true;
- ReleaseSysCache(utup);
-
- return retval;
+ utup = SearchSysCache(AUTHOID,
+ ObjectIdGetDatum(GetUserId()),
+ 0, 0, 0);
+ if (HeapTupleIsValid(utup))
+ {
+ result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
+ ReleaseSysCache(utup);
+ }
+ return result;
}
-
-static char *
-resolve_alt_dbpath(const char *dbpath, Oid dboid)
+/*
+ * Remove tablespace directories
+ *
+ * We don't know what tablespaces db_id is using, so iterate through all
+ * tablespaces removing <tablespace>/db_id
+ */
+static void
+remove_dbtablespaces(Oid db_id)
{
- const char *prefix;
- char *ret;
- size_t len;
-
- if (dbpath == NULL || dbpath[0] == '\0')
- return NULL;
+ Relation rel;
+ HeapScanDesc scan;
+ HeapTuple tuple;
- if (first_path_separator(dbpath))
- {
- if (!is_absolute_path(dbpath))
- elog(ERROR, "Relative paths are not allowed as database locations");
-#ifndef ALLOW_ABSOLUTE_DBPATHS
- elog(ERROR, "Absolute paths are not allowed as database locations");
-#endif
- prefix = dbpath;
- }
- else
+ rel = heap_open(TableSpaceRelationId, AccessShareLock);
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
- /* must be environment variable */
- char *var = getenv(dbpath);
-
- if (!var)
- elog(ERROR, "Postmaster environment variable '%s' not set", dbpath);
- if (!is_absolute_path(var))
- elog(ERROR, "Postmaster environment variable '%s' must be absolute path", dbpath);
- prefix = var;
- }
+ Oid dsttablespace = HeapTupleGetOid(tuple);
+ char *dstpath;
+ struct stat st;
- len = strlen(prefix) + 6 + sizeof(Oid) * 8 + 1;
- if (len >= MAXPGPATH - 100)
- elog(ERROR, "Alternate path is too long");
+ /* Don't mess with the global tablespace */
+ if (dsttablespace == GLOBALTABLESPACE_OID)
+ continue;
- ret = palloc(len);
- snprintf(ret, len, "%s/base/%u", prefix, dboid);
+ dstpath = GetDatabasePath(db_id, dsttablespace);
- return ret;
-}
+ if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
+ {
+ /* Assume we can ignore it */
+ pfree(dstpath);
+ continue;
+ }
+ if (!rmtree(dstpath, true))
+ ereport(WARNING,
+ (errmsg("could not remove database directory \"%s\"",
+ dstpath)));
-static bool
-remove_dbdirs(const char *nominal_loc, const char *alt_loc)
-{
- const char *target_dir;
- char buf[MAXPGPATH + 100];
- bool success = true;
+ /* Record the filesystem change in XLOG */
+ {
+ xl_dbase_drop_rec xlrec;
+ XLogRecData rdata[1];
- target_dir = alt_loc ? alt_loc : nominal_loc;
+ xlrec.db_id = db_id;
+ xlrec.tablespace_id = dsttablespace;
- /*
- * Close virtual file descriptors so the kernel has more available for
- * the system() call below.
- */
- closeAllVfds();
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof(xl_dbase_drop_rec);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = NULL;
- if (alt_loc)
- {
- /* remove symlink */
- if (unlink(nominal_loc) != 0)
- {
- elog(WARNING, "could not remove '%s': %m", nominal_loc);
- success = false;
+ (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
}
- }
-
-#ifndef WIN32
- snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir);
-#else
- snprintf(buf, sizeof(buf), "rmdir /s /q \"%s\"", target_dir);
-#endif
- if (system(buf) != 0)
- {
- elog(WARNING, "database directory '%s' could not be removed",
- target_dir);
- success = false;
+ pfree(dstpath);
}
- return success;
+ heap_endscan(scan);
+ heap_close(rel, AccessShareLock);
}
* get_database_oid - given a database name, look up the OID
*
* Returns InvalidOid if database name not found.
- *
- * This is not actually used in this file, but is exported for use elsewhere.
*/
Oid
get_database_oid(const char *dbname)
{
Relation pg_database;
ScanKeyData entry[1];
- HeapScanDesc scan;
+ SysScanDesc scan;
HeapTuple dbtuple;
Oid oid;
- /* There's no syscache for pg_database, so must look the hard way */
- pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
- ScanKeyEntryInitialize(&entry[0], 0x0,
- Anum_pg_database_datname, F_NAMEEQ,
- CStringGetDatum(dbname));
- scan = heap_beginscan(pg_database, SnapshotNow, 1, entry);
+ /*
+ * There's no syscache for pg_database indexed by name,
+ * so we must look the hard way.
+ */
+ pg_database = heap_open(DatabaseRelationId, AccessShareLock);
+ ScanKeyInit(&entry[0],
+ Anum_pg_database_datname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(dbname));
+ scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
+ SnapshotNow, 1, entry);
- dbtuple = heap_getnext(scan, ForwardScanDirection);
+ dbtuple = systable_getnext(scan);
/* We assume that there can be at most one matching tuple */
if (HeapTupleIsValid(dbtuple))
else
oid = InvalidOid;
- heap_endscan(scan);
+ systable_endscan(scan);
heap_close(pg_database, AccessShareLock);
return oid;
}
+
/*
- * get_database_owner - given a database OID, fetch the owner's usesysid.
- *
- * Errors out if database not found.
+ * get_database_name - given a database OID, look up the name
*
- * This is not actually used in this file, but is exported for use elsewhere.
+ * Returns a palloc'd string, or NULL if no such database.
*/
-Oid
-get_database_owner(Oid dbid)
+char *
+get_database_name(Oid dbid)
{
- Relation pg_database;
- ScanKeyData entry[1];
- HeapScanDesc scan;
HeapTuple dbtuple;
- int32 dba;
+ char *result;
+
+ dbtuple = SearchSysCache(DATABASEOID,
+ ObjectIdGetDatum(dbid),
+ 0, 0, 0);
+ if (HeapTupleIsValid(dbtuple))
+ {
+ result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
+ ReleaseSysCache(dbtuple);
+ }
+ else
+ result = NULL;
- /* There's no syscache for pg_database, so must look the hard way */
- pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
- ScanKeyEntryInitialize(&entry[0], 0x0,
- ObjectIdAttributeNumber, F_OIDEQ,
- ObjectIdGetDatum(dbid));
- scan = heap_beginscan(pg_database, SnapshotNow, 1, entry);
+ return result;
+}
+
+/*
+ * DATABASE resource manager's routines
+ */
+void
+dbase_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+ uint8 info = record->xl_info & ~XLR_INFO_MASK;
- dbtuple = heap_getnext(scan, ForwardScanDirection);
+ if (info == XLOG_DBASE_CREATE)
+ {
+ xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
+ char *src_path;
+ char *dst_path;
+ struct stat st;
- if (!HeapTupleIsValid(dbtuple))
- elog(ERROR, "database %u does not exist", dbid);
+ src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
+ dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
- dba = ((Form_pg_database) GETSTRUCT(dbtuple))->datdba;
+ /*
+ * Our theory for replaying a CREATE is to forcibly drop the target
+ * subdirectory if present, then re-copy the source data. This may be
+ * more work than needed, but it is simple to implement.
+ */
+ if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
+ {
+ if (!rmtree(dst_path, true))
+ ereport(WARNING,
+ (errmsg("could not remove database directory \"%s\"",
+ dst_path)));
+ }
- heap_endscan(scan);
- heap_close(pg_database, AccessShareLock);
+ /*
+ * Force dirty buffers out to disk, to ensure source database is
+ * up-to-date for the copy. (We really only need to flush buffers for
+ * the source database, but bufmgr.c provides no API for that.)
+ */
+ BufferSync();
- /* XXX some confusion about whether userids are OID or int4 ... */
- return (Oid) dba;
+ /*
+ * Copy this subdirectory to the new location
+ *
+ * We don't need to copy subdirectories
+ */
+ copydir(src_path, dst_path, false);
+ }
+ else if (info == XLOG_DBASE_DROP)
+ {
+ xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
+ char *dst_path;
+
+ dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+
+ /* Drop pages for this database that are in the shared buffer cache */
+ DropDatabaseBuffers(xlrec->db_id);
+
+ /* Also, clean out any entries in the shared free space map */
+ FreeSpaceMapForgetDatabase(xlrec->db_id);
+
+ /* Clean out the xlog relcache too */
+ XLogDropDatabase(xlrec->db_id);
+
+ /* And remove the physical files */
+ if (!rmtree(dst_path, true))
+ ereport(WARNING,
+ (errmsg("could not remove database directory \"%s\"",
+ dst_path)));
+ }
+ else
+ elog(PANIC, "dbase_redo: unknown op code %u", info);
+}
+
+void
+dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
+{
+ uint8 info = xl_info & ~XLR_INFO_MASK;
+
+ if (info == XLOG_DBASE_CREATE)
+ {
+ xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
+
+ appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
+ xlrec->src_db_id, xlrec->src_tablespace_id,
+ xlrec->db_id, xlrec->tablespace_id);
+ }
+ else if (info == XLOG_DBASE_DROP)
+ {
+ xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
+
+ appendStringInfo(buf, "drop db: dir %u/%u",
+ xlrec->db_id, xlrec->tablespace_id);
+ }
+ else
+ appendStringInfo(buf, "UNKNOWN");
}