X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fcommands%2Fdbcommands.c;h=6d744a5bad32f524d19219e83bde704af5b32079;hb=d4cef0aa2a55fafbd9ce2783c1eb9e0157c6781e;hp=a602ff109cd2a14db941140a859a81ac05a8c03a;hpb=2ff501590b323bde14f7e662fd89ad12a4d2f4e0;p=postgresql diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index a602ff109c..6d744a5bad 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -3,13 +3,17 @@ * dbcommands.c * Database management commands (create/drop database). * + * Note: database creation/destruction commands use exclusive locks on + * the database objects (as expressed by LockSharedObject()) to avoid + * stepping on each others' toes. Formerly we used table-level locks + * on pg_database, but that's too coarse-grained. * - * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.148 2004/12/31 21:59:41 pgsql Exp $ + * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.182 2006/07/10 16:20:50 alvherre Exp $ * *------------------------------------------------------------------------- */ @@ -21,12 +25,12 @@ #include "access/genam.h" #include "access/heapam.h" -#include "catalog/catname.h" #include "catalog/catalog.h" +#include "catalog/dependency.h" +#include "catalog/indexing.h" +#include "catalog/pg_authid.h" #include "catalog/pg_database.h" -#include "catalog/pg_shadow.h" #include "catalog/pg_tablespace.h" -#include "catalog/indexing.h" #include "commands/comment.h" #include "commands/dbcommands.h" #include "commands/tablespace.h" @@ -35,10 +39,11 @@ #include "postmaster/bgwriter.h" #include "storage/fd.h" #include "storage/freespace.h" -#include "storage/sinval.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" +#include "utils/flatfiles.h" #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" @@ -46,9 +51,11 @@ /* non-export function prototypes */ -static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP, - int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP, - TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, +static bool get_db_info(const char *name, LOCKMODE lockmode, + Oid *dbIdP, Oid *ownerIdP, + int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, + Oid *dbLastSysOidP, + TransactionId *dbVacuumXidP, TransactionId *dbMinXidP, Oid *dbTablespace); static bool have_createdb_privilege(void); static void remove_dbtablespaces(Oid db_id); @@ -63,34 +70,32 @@ createdb(const CreatedbStmt *stmt) HeapScanDesc scan; Relation rel; Oid src_dboid; - AclId src_owner; + Oid src_owner; int src_encoding; bool src_istemplate; + bool src_allowconn; Oid src_lastsysoid; TransactionId src_vacuumxid; - TransactionId src_frozenxid; + TransactionId src_minxid; Oid src_deftablespace; - Oid dst_deftablespace; + volatile Oid dst_deftablespace; Relation pg_database_rel; HeapTuple tuple; - TupleDesc pg_database_dsc; Datum new_record[Natts_pg_database]; char new_record_nulls[Natts_pg_database]; Oid dboid; - AclId datdba; + Oid datdba; ListCell *option; DefElem *dtablespacename = NULL; DefElem *downer = NULL; DefElem *dtemplate = NULL; DefElem *dencoding = NULL; + DefElem *dconnlimit = NULL; char *dbname = stmt->dbname; char *dbowner = NULL; - char *dbtemplate = NULL; + const char *dbtemplate = NULL; int encoding = -1; - -#ifndef WIN32 - char buf[2 * MAXPGPATH + 100]; -#endif + int dbconnlimit = -1; /* don't call this in a transaction block */ PreventTransactionChain((void *) stmt, "CREATE DATABASE"); @@ -132,6 +137,14 @@ createdb(const CreatedbStmt *stmt) errmsg("conflicting or redundant options"))); dencoding = defel; } + else if (strcmp(defel->defname, "connectionlimit") == 0) + { + if (dconnlimit) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + dconnlimit = defel; + } else if (strcmp(defel->defname, "location") == 0) { ereport(WARNING, @@ -177,56 +190,49 @@ createdb(const CreatedbStmt *stmt) elog(ERROR, "unrecognized node type: %d", nodeTag(dencoding->arg)); } + if (dconnlimit && dconnlimit->arg) + dbconnlimit = intVal(dconnlimit->arg); - /* obtain sysid of proposed owner */ + /* obtain OID of proposed owner */ if (dbowner) - datdba = get_usesysid(dbowner); /* will ereport if no such user */ + datdba = get_roleid_checked(dbowner); else datdba = GetUserId(); - if (datdba == GetUserId()) - { - /* creating database for self: can be superuser or createdb */ - if (!superuser() && !have_createdb_privilege()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied to create database"))); - } - else - { - /* creating database for someone else: must be superuser */ - /* note that the someone else need not have any permissions */ - if (!superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to create database for another user"))); - } - /* - * Check for db name conflict. There is a race condition here, since - * another backend could create the same DB name before we commit. - * However, holding an exclusive lock on pg_database for the whole - * time we are copying the source database doesn't seem like a good - * idea, so accept possibility of race to create. We will check again - * after we grab the exclusive lock. + * To create a database, must have createdb privilege and must be able to + * become the target role (this does not imply that the target role itself + * must have createdb privilege). The latter provision guards against + * "giveaway" attacks. Note that a superuser will always have both of + * these privileges a fortiori. */ - if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) + if (!have_createdb_privilege()) ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_DATABASE), - errmsg("database \"%s\" already exists", dbname))); + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to create database"))); + + check_is_member_of_role(GetUserId(), datdba); /* - * Lookup database (template) to be cloned. + * Lookup database (template) to be cloned, and obtain share lock on it. + * ShareLock allows two CREATE DATABASEs to work from the same template + * concurrently, while ensuring no one is busy dropping it in parallel + * (which would be Very Bad since we'd likely get an incomplete copy + * without knowing it). This also prevents any new connections from being + * made to the source until we finish copying it, so we can be sure it + * won't change underneath us. */ if (!dbtemplate) dbtemplate = "template1"; /* Default template database name */ - if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding, - &src_istemplate, &src_lastsysoid, - &src_vacuumxid, &src_frozenxid, &src_deftablespace)) + if (!get_db_info(dbtemplate, ShareLock, + &src_dboid, &src_owner, &src_encoding, + &src_istemplate, &src_allowconn, &src_lastsysoid, + &src_vacuumxid, &src_minxid, &src_deftablespace)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), - errmsg("template database \"%s\" does not exist", dbtemplate))); + errmsg("template database \"%s\" does not exist", + dbtemplate))); /* * Permission check: to copy a DB that's not marked datistemplate, you @@ -234,7 +240,7 @@ createdb(const CreatedbStmt *stmt) */ if (!src_istemplate) { - if (!superuser() && GetUserId() != src_owner) + if (!pg_database_ownercheck(src_dboid, GetUserId())) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied to copy database \"%s\"", @@ -244,14 +250,13 @@ createdb(const CreatedbStmt *stmt) /* * The source DB can't have any active backends, except this one * (exception is to allow CREATE DB while connected to template1). - * Otherwise we might copy inconsistent data. This check is not - * bulletproof, since someone might connect while we are copying... + * Otherwise we might copy inconsistent data. */ if (DatabaseHasActiveBackends(src_dboid, true)) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("source database \"%s\" is being accessed by other users", - dbtemplate))); + errmsg("source database \"%s\" is being accessed by other users", + dbtemplate))); /* If encoding is defaulted, use source's encoding */ if (encoding < 0) @@ -286,7 +291,7 @@ createdb(const CreatedbStmt *stmt) /* * If we are trying to change the default tablespace of the template, * we require that the template not have any files in the new default - * tablespace. This is necessary because otherwise the copied + * tablespace. This is necessary because otherwise the copied * database would contain pg_class rows that refer to its default * tablespace both explicitly (by OID) and implicitly (as zero), which * would cause problems. For example another CREATE DATABASE using @@ -322,146 +327,22 @@ createdb(const CreatedbStmt *stmt) } /* - * Preassign OID for pg_database tuple, so that we can compute db - * path. + * Check for db name conflict. This is just to give a more friendly + * error message than "unique index violation". There's a race condition + * but we're willing to accept the less friendly message in that case. */ - dboid = newoid(); - - /* - * Force dirty buffers out to disk, to ensure source database is - * up-to-date for the copy. (We really only need to flush buffers for - * the source database, but bufmgr.c provides no API for that.) - */ - BufferSync(-1, -1); - - /* - * Close virtual file descriptors so the kernel has more available for - * the system() calls below. - */ - closeAllVfds(); - - /* - * Iterate through all tablespaces of the template database, and copy - * each one to the new database. - */ - rel = heap_openr(TableSpaceRelationName, AccessShareLock); - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - Oid srctablespace = HeapTupleGetOid(tuple); - Oid dsttablespace; - char *srcpath; - char *dstpath; - struct stat st; - - /* No need to copy global tablespace */ - if (srctablespace == GLOBALTABLESPACE_OID) - continue; - - srcpath = GetDatabasePath(src_dboid, srctablespace); - - if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) || - directory_is_empty(srcpath)) - { - /* Assume we can ignore it */ - pfree(srcpath); - continue; - } - - if (srctablespace == src_deftablespace) - dsttablespace = dst_deftablespace; - else - dsttablespace = srctablespace; - - dstpath = GetDatabasePath(dboid, dsttablespace); - - if (stat(dstpath, &st) == 0 || errno != ENOENT) - { - remove_dbtablespaces(dboid); - ereport(ERROR, - (errmsg("could not initialize database directory"), - errdetail("Directory \"%s\" already exists.", - dstpath))); - } - -#ifndef WIN32 - - /* - * Copy this subdirectory to the new location - * - * XXX use of cp really makes this code pretty grotty, particularly - * with respect to lack of ability to report errors well. Someday - * rewrite to do it for ourselves. - */ - - /* We might need to use cp -R one day for portability */ - snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", - srcpath, dstpath); - if (system(buf) != 0) - { - remove_dbtablespaces(dboid); - ereport(ERROR, - (errmsg("could not initialize database directory"), - errdetail("Failing system command was: %s", buf), - errhint("Look in the postmaster's stderr log for more information."))); - } -#else /* WIN32 */ - if (copydir(srcpath, dstpath) != 0) - { - /* copydir should already have given details of its troubles */ - remove_dbtablespaces(dboid); - ereport(ERROR, - (errmsg("could not initialize database directory"))); - } -#endif /* WIN32 */ - - /* Record the filesystem change in XLOG */ - { - xl_dbase_create_rec xlrec; - XLogRecData rdata[3]; - - xlrec.db_id = dboid; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = offsetof(xl_dbase_create_rec, src_path); - rdata[0].next = &(rdata[1]); - - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) srcpath; - rdata[1].len = strlen(srcpath) + 1; - rdata[1].next = &(rdata[2]); - - rdata[2].buffer = InvalidBuffer; - rdata[2].data = (char *) dstpath; - rdata[2].len = strlen(dstpath) + 1; - rdata[2].next = NULL; - - (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata); - } - } - heap_endscan(scan); - heap_close(rel, AccessShareLock); - - /* - * Now OK to grab exclusive lock on pg_database. - */ - pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock); - - /* Check to see if someone else created same DB name meanwhile. */ - if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) - { - /* Don't hold lock while doing recursive remove */ - heap_close(pg_database_rel, AccessExclusiveLock); - remove_dbtablespaces(dboid); + if (OidIsValid(get_database_oid(dbname))) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_DATABASE), errmsg("database \"%s\" already exists", dbname))); - } /* - * Insert a new tuple into pg_database + * Insert a new tuple into pg_database. This establishes our ownership + * of the new database name (anyone else trying to insert the same name + * will block on the unique index, and fail after we commit). It also + * assigns the OID that the new database will have. */ - pg_database_dsc = RelationGetDescr(pg_database_rel); + pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock); /* Form tuple */ MemSet(new_record, 0, sizeof(new_record)); @@ -469,13 +350,14 @@ createdb(const CreatedbStmt *stmt) new_record[Anum_pg_database_datname - 1] = DirectFunctionCall1(namein, CStringGetDatum(dbname)); - new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba); + new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba); new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); + new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid); - new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); + new_record[Anum_pg_database_datminxid - 1] = TransactionIdGetDatum(src_minxid); new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); /* @@ -487,25 +369,157 @@ createdb(const CreatedbStmt *stmt) new_record_nulls[Anum_pg_database_datconfig - 1] = 'n'; new_record_nulls[Anum_pg_database_datacl - 1] = 'n'; - tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls); - - HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID - * selection */ + tuple = heap_formtuple(RelationGetDescr(pg_database_rel), + new_record, new_record_nulls); - simple_heap_insert(pg_database_rel, tuple); + dboid = simple_heap_insert(pg_database_rel, tuple); /* Update indexes */ CatalogUpdateIndexes(pg_database_rel, tuple); /* - * Force dirty buffers out to disk, so that newly-connecting backends - * will see the new database in pg_database right away. (They'll see - * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.) + * Now generate additional catalog entries associated with the new DB */ - FlushRelationBuffers(pg_database_rel, MaxBlockNumber); - /* Close pg_database, but keep exclusive lock till commit */ - heap_close(pg_database_rel, NoLock); + /* Register owner dependency */ + recordDependencyOnOwner(DatabaseRelationId, dboid, datdba); + + /* Create pg_shdepend entries for objects within database */ + copyTemplateDependencies(src_dboid, dboid); + + /* + * Force dirty buffers out to disk, to ensure source database is + * up-to-date for the copy. (We really only need to flush buffers for the + * source database, but bufmgr.c provides no API for that.) + */ + BufferSync(); + + /* + * Once we start copying subdirectories, we need to be able to clean 'em + * up if we fail. Establish a TRY block to make sure this happens. (This + * is not a 100% solution, because of the possibility of failure during + * transaction commit after we leave this routine, but it should handle + * most scenarios.) + */ + PG_TRY(); + { + /* + * Iterate through all tablespaces of the template database, and copy + * each one to the new database. + */ + rel = heap_open(TableSpaceRelationId, AccessShareLock); + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Oid srctablespace = HeapTupleGetOid(tuple); + Oid dsttablespace; + char *srcpath; + char *dstpath; + struct stat st; + + /* No need to copy global tablespace */ + if (srctablespace == GLOBALTABLESPACE_OID) + continue; + + srcpath = GetDatabasePath(src_dboid, srctablespace); + + if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) || + directory_is_empty(srcpath)) + { + /* Assume we can ignore it */ + pfree(srcpath); + continue; + } + + if (srctablespace == src_deftablespace) + dsttablespace = dst_deftablespace; + else + dsttablespace = srctablespace; + + dstpath = GetDatabasePath(dboid, dsttablespace); + + /* + * Copy this subdirectory to the new location + * + * We don't need to copy subdirectories + */ + copydir(srcpath, dstpath, false); + + /* Record the filesystem change in XLOG */ + { + xl_dbase_create_rec xlrec; + XLogRecData rdata[1]; + + xlrec.db_id = dboid; + xlrec.tablespace_id = dsttablespace; + xlrec.src_db_id = src_dboid; + xlrec.src_tablespace_id = srctablespace; + + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(xl_dbase_create_rec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; + + (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata); + } + } + heap_endscan(scan); + heap_close(rel, AccessShareLock); + + /* + * We force a checkpoint before committing. This effectively means + * that committed XLOG_DBASE_CREATE operations will never need to be + * replayed (at least not in ordinary crash recovery; we still have to + * make the XLOG entry for the benefit of PITR operations). This + * avoids two nasty scenarios: + * + * #1: When PITR is off, we don't XLOG the contents of newly created + * indexes; therefore the drop-and-recreate-whole-directory behavior + * of DBASE_CREATE replay would lose such indexes. + * + * #2: Since we have to recopy the source database during DBASE_CREATE + * replay, we run the risk of copying changes in it that were + * committed after the original CREATE DATABASE command but before the + * system crash that led to the replay. This is at least unexpected + * and at worst could lead to inconsistencies, eg duplicate table + * names. + * + * (Both of these were real bugs in releases 8.0 through 8.0.3.) + * + * In PITR replay, the first of these isn't an issue, and the second + * is only a risk if the CREATE DATABASE and subsequent template + * database change both occur while a base backup is being taken. + * There doesn't seem to be much we can do about that except document + * it as a limitation. + * + * Perhaps if we ever implement CREATE DATABASE in a less cheesy way, + * we can avoid this. + */ + RequestCheckpoint(true, false); + + /* + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ + heap_close(pg_database_rel, NoLock); + + /* + * Set flag to update flat database file at commit. + */ + database_file_update_needed(); + } + PG_CATCH(); + { + /* Release lock on source database before doing recursive remove */ + UnlockSharedObject(DatabaseRelationId, src_dboid, 0, + ShareLock); + + /* Throw away any successfully copied subdirectories */ + remove_dbtablespaces(dboid); + + PG_RE_THROW(); + } + PG_END_TRY(); } @@ -513,14 +527,11 @@ createdb(const CreatedbStmt *stmt) * DROP DATABASE */ void -dropdb(const char *dbname) +dropdb(const char *dbname, bool missing_ok) { - int4 db_owner; - bool db_istemplate; Oid db_id; + bool db_istemplate; Relation pgdbrel; - SysScanDesc pgdbscan; - ScanKeyData key; HeapTuple tup; PreventTransactionChain((void *) dbname, "DROP DATABASE"); @@ -533,30 +544,45 @@ dropdb(const char *dbname) errmsg("cannot drop the currently open database"))); /* - * Obtain exclusive lock on pg_database. We need this to ensure that - * no new backend starts up in the target database while we are - * deleting it. (Actually, a new backend might still manage to start - * up, because it will read pg_database without any locking to - * discover the database's OID. But it will detect its error in - * ReverifyMyDatabase and shut down before any serious damage is done. - * See postinit.c.) + * Look up the target database's OID, and get exclusive lock on it. + * We need this to ensure that no new backend starts up in the target + * database while we are deleting it (see postinit.c), and that no one is + * using it as a CREATE DATABASE template or trying to delete it for + * themselves. */ - pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock); + pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock); - if (!get_db_info(dbname, &db_id, &db_owner, NULL, - &db_istemplate, NULL, NULL, NULL, NULL)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_DATABASE), - errmsg("database \"%s\" does not exist", dbname))); + if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, + &db_istemplate, NULL, NULL, NULL, NULL, NULL)) + { + if (!missing_ok) + { + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_DATABASE), + errmsg("database \"%s\" does not exist", dbname))); + } + else + { + /* Close pg_database, release the lock, since we changed nothing */ + heap_close(pgdbrel, RowExclusiveLock); + ereport(NOTICE, + (errmsg("database \"%s\" does not exist, skipping", + dbname))); + return; + } + } - if (GetUserId() != db_owner && !superuser()) + /* + * Permission checks + */ + if (!pg_database_ownercheck(db_id, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, dbname); /* * Disallow dropping a DB that is marked istemplate. This is just to - * prevent people from accidentally dropping template0 or template1; - * they can do so if they're really determined ... + * prevent people from accidentally dropping template0 or template1; they + * can do so if they're really determined ... */ if (db_istemplate) ereport(ERROR, @@ -564,55 +590,44 @@ dropdb(const char *dbname) errmsg("cannot drop a template database"))); /* - * Check for active backends in the target database. + * Check for active backends in the target database. (Because we hold + * the database lock, no new ones can start after this.) */ if (DatabaseHasActiveBackends(db_id, false)) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("database \"%s\" is being accessed by other users", - dbname))); + errmsg("database \"%s\" is being accessed by other users", + dbname))); /* - * Find the database's tuple by OID (should be unique). + * Remove the database's tuple from pg_database. */ - ScanKeyInit(&key, - ObjectIdAttributeNumber, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(db_id)); - - pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true, - SnapshotNow, 1, &key); - - tup = systable_getnext(pgdbscan); + tup = SearchSysCache(DATABASEOID, + ObjectIdGetDatum(db_id), + 0, 0, 0); if (!HeapTupleIsValid(tup)) - { - /* - * This error should never come up since the existence of the - * database is checked earlier - */ - elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary", - dbname); - } + elog(ERROR, "cache lookup failed for database %u", db_id); - /* Remove the database's tuple from pg_database */ simple_heap_delete(pgdbrel, &tup->t_self); - systable_endscan(pgdbscan); + ReleaseSysCache(tup); /* - * Delete any comments associated with the database - * - * NOTE: this is probably dead code since any such comments should have - * been in that database, not mine. + * Delete any comments associated with the database. */ - DeleteComments(db_id, RelationGetRelid(pgdbrel), 0); + DeleteSharedComments(db_id, DatabaseRelationId); /* - * Drop pages for this database that are in the shared buffer cache. - * This is important to ensure that no remaining backend tries to - * write out a dirty buffer to the dead database later... + * Remove shared dependency references for the database. */ - DropBuffers(db_id); + dropDatabaseDependencies(db_id); + + /* + * Drop pages for this database that are in the shared buffer cache. This + * is important to ensure that no remaining backend tries to write out a + * dirty buffer to the dead database later... + */ + DropDatabaseBuffers(db_id); /* * Also, clean out any entries in the shared free space map. @@ -624,7 +639,7 @@ dropdb(const char *dbname) * open files, which would cause rmdir() to fail. */ #ifdef WIN32 - RequestCheckpoint(true); + RequestCheckpoint(true, false); #endif /* @@ -633,15 +648,15 @@ dropdb(const char *dbname) remove_dbtablespaces(db_id); /* - * Force dirty buffers out to disk, so that newly-connecting backends - * will see the database tuple marked dead in pg_database right away. - * (They'll see an uncommitted deletion, but they don't care; see - * GetRawDatabaseInfo.) + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) */ - FlushRelationBuffers(pgdbrel, MaxBlockNumber); - - /* Close pg_database, but keep exclusive lock till commit */ heap_close(pgdbrel, NoLock); + + /* + * Set flag to update flat database file at commit. + */ + database_file_update_needed(); } @@ -651,96 +666,173 @@ dropdb(const char *dbname) void RenameDatabase(const char *oldname, const char *newname) { - HeapTuple tup, - newtup; + Oid db_id; + HeapTuple newtup; Relation rel; - SysScanDesc scan, - scan2; - ScanKeyData key, - key2; /* - * Obtain AccessExclusiveLock so that no new session gets started - * while the rename is in progress. + * Look up the target database's OID, and get exclusive lock on it. + * We need this for the same reasons as DROP DATABASE. */ - rel = heap_openr(DatabaseRelationName, AccessExclusiveLock); - - ScanKeyInit(&key, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(oldname)); - scan = systable_beginscan(rel, DatabaseNameIndex, true, - SnapshotNow, 1, &key); + rel = heap_open(DatabaseRelationId, RowExclusiveLock); - tup = systable_getnext(scan); - if (!HeapTupleIsValid(tup)) + if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database \"%s\" does not exist", oldname))); /* - * XXX Client applications probably store the current database - * somewhere, so renaming it could cause confusion. On the other - * hand, there may not be an actual problem besides a little - * confusion, so think about this and decide. + * XXX Client applications probably store the current database somewhere, + * so renaming it could cause confusion. On the other hand, there may not + * be an actual problem besides a little confusion, so think about this + * and decide. */ - if (HeapTupleGetOid(tup) == MyDatabaseId) + if (db_id == MyDatabaseId) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("current database may not be renamed"))); /* - * Make sure the database does not have active sessions. Might not be - * necessary, but it's consistent with other database operations. + * Make sure the database does not have active sessions. This is the + * same concern as above, but applied to other sessions. */ - if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false)) + if (DatabaseHasActiveBackends(db_id, false)) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("database \"%s\" is being accessed by other users", - oldname))); + errmsg("database \"%s\" is being accessed by other users", + oldname))); /* make sure the new name doesn't exist */ - ScanKeyInit(&key2, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(newname)); - scan2 = systable_beginscan(rel, DatabaseNameIndex, true, - SnapshotNow, 1, &key2); - if (HeapTupleIsValid(systable_getnext(scan2))) + if (OidIsValid(get_database_oid(newname))) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_DATABASE), errmsg("database \"%s\" already exists", newname))); - systable_endscan(scan2); /* must be owner */ - if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId())) + if (!pg_database_ownercheck(db_id, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, oldname); - /* must have createdb */ + /* must have createdb rights */ if (!have_createdb_privilege()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied to rename database"))); /* rename */ - newtup = heap_copytuple(tup); + newtup = SearchSysCacheCopy(DATABASEOID, + ObjectIdGetDatum(db_id), + 0, 0, 0); + if (!HeapTupleIsValid(newtup)) + elog(ERROR, "cache lookup failed for database %u", db_id); namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname); simple_heap_update(rel, &newtup->t_self, newtup); CatalogUpdateIndexes(rel, newtup); - systable_endscan(scan); + /* + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ + heap_close(rel, NoLock); /* - * Force dirty buffers out to disk, so that newly-connecting backends - * will see the renamed database in pg_database right away. (They'll - * see an uncommitted tuple, but they don't care; see - * GetRawDatabaseInfo.) + * Set flag to update flat database file at commit. */ - FlushRelationBuffers(rel, MaxBlockNumber); + database_file_update_needed(); +} + + +/* + * ALTER DATABASE name ... + */ +void +AlterDatabase(AlterDatabaseStmt *stmt) +{ + Relation rel; + HeapTuple tuple, + newtuple; + ScanKeyData scankey; + SysScanDesc scan; + ListCell *option; + int connlimit = -1; + DefElem *dconnlimit = NULL; + Datum new_record[Natts_pg_database]; + char new_record_nulls[Natts_pg_database]; + char new_record_repl[Natts_pg_database]; + + /* Extract options from the statement node tree */ + foreach(option, stmt->options) + { + DefElem *defel = (DefElem *) lfirst(option); + + if (strcmp(defel->defname, "connectionlimit") == 0) + { + if (dconnlimit) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + dconnlimit = defel; + } + else + elog(ERROR, "option \"%s\" not recognized", + defel->defname); + } + + if (dconnlimit) + connlimit = intVal(dconnlimit->arg); + + /* + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. + */ + rel = heap_open(DatabaseRelationId, RowExclusiveLock); + ScanKeyInit(&scankey, + Anum_pg_database_datname, + BTEqualStrategyNumber, F_NAMEEQ, + NameGetDatum(stmt->dbname)); + scan = systable_beginscan(rel, DatabaseNameIndexId, true, + SnapshotNow, 1, &scankey); + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_DATABASE), + errmsg("database \"%s\" does not exist", stmt->dbname))); + + if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, + stmt->dbname); + + /* + * Build an updated tuple, perusing the information just obtained + */ + MemSet(new_record, 0, sizeof(new_record)); + MemSet(new_record_nulls, ' ', sizeof(new_record_nulls)); + MemSet(new_record_repl, ' ', sizeof(new_record_repl)); + + if (dconnlimit) + { + new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit); + new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r'; + } + + newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record, + new_record_nulls, new_record_repl); + simple_heap_update(rel, &tuple->t_self, newtuple); + + /* Update indexes */ + CatalogUpdateIndexes(rel, newtuple); + + systable_endscan(scan); - /* Close pg_database, but keep exclusive lock till commit */ + /* Close pg_database, but keep lock till commit */ heap_close(rel, NoLock); + + /* + * We don't bother updating the flat file since the existing options for + * ALTER DATABASE don't affect it. + */ } @@ -763,14 +855,16 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt) valuestr = flatten_set_variable_args(stmt->variable, stmt->value); /* - * We need AccessExclusiveLock so we can safely do FlushRelationBuffers. + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. */ - rel = heap_openr(DatabaseRelationName, AccessExclusiveLock); + rel = heap_open(DatabaseRelationId, RowExclusiveLock); ScanKeyInit(&scankey, Anum_pg_database_datname, BTEqualStrategyNumber, F_NAMEEQ, NameGetDatum(stmt->dbname)); - scan = systable_beginscan(rel, DatabaseNameIndex, true, + scan = systable_beginscan(rel, DatabaseNameIndexId, true, SnapshotNow, 1, &scankey); tuple = systable_getnext(scan); if (!HeapTupleIsValid(tuple)) @@ -778,8 +872,7 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt) (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database \"%s\" does not exist", stmt->dbname))); - if (!(superuser() - || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId())) + if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, stmt->dbname); @@ -816,7 +909,7 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt) repl_null[Anum_pg_database_datconfig - 1] = 'n'; } - newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl); + newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl); simple_heap_update(rel, &tuple->t_self, newtuple); /* Update indexes */ @@ -824,16 +917,13 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt) systable_endscan(scan); + /* Close pg_database, but keep lock till commit */ + heap_close(rel, NoLock); + /* - * Force dirty buffers out to disk, so that newly-connecting backends - * will see the altered row in pg_database right away. (They'll - * see an uncommitted tuple, but they don't care; see - * GetRawDatabaseInfo.) + * We don't bother updating the flat file since ALTER DATABASE SET doesn't + * affect it. */ - FlushRelationBuffers(rel, MaxBlockNumber); - - /* Close pg_database, but keep exclusive lock till commit */ - heap_close(rel, NoLock); } @@ -841,7 +931,7 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt) * ALTER DATABASE name OWNER TO newowner */ void -AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId) +AlterDatabaseOwner(const char *dbname, Oid newOwnerId) { HeapTuple tuple; Relation rel; @@ -850,14 +940,16 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId) Form_pg_database datForm; /* - * We need AccessExclusiveLock so we can safely do FlushRelationBuffers. + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. */ - rel = heap_openr(DatabaseRelationName, AccessExclusiveLock); + rel = heap_open(DatabaseRelationId, RowExclusiveLock); ScanKeyInit(&scankey, Anum_pg_database_datname, BTEqualStrategyNumber, F_NAMEEQ, NameGetDatum(dbname)); - scan = systable_beginscan(rel, DatabaseNameIndex, true, + scan = systable_beginscan(rel, DatabaseNameIndexId, true, SnapshotNow, 1, &scankey); tuple = systable_getnext(scan); if (!HeapTupleIsValid(tuple)) @@ -872,7 +964,7 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId) * command to have succeeded. This is to be consistent with other * objects. */ - if (datForm->datdba != newOwnerSysId) + if (datForm->datdba != newOwnerId) { Datum repl_val[Natts_pg_database]; char repl_null[Natts_pg_database]; @@ -882,18 +974,33 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId) bool isNull; HeapTuple newtuple; - /* changing owner's database for someone else: must be superuser */ - /* note that the someone else need not have any permissions */ - if (!superuser()) + /* Otherwise, must be owner of the existing object */ + if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, + dbname); + + /* Must be able to become new owner */ + check_is_member_of_role(GetUserId(), newOwnerId); + + /* + * must have createdb rights + * + * NOTE: This is different from other alter-owner checks in that the + * current user is checked for createdb privileges instead of the + * destination owner. This is consistent with the CREATE case for + * databases. Because superusers will always have this right, we need + * no special case for them. + */ + if (!have_createdb_privilege()) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to change owner"))); + errmsg("permission denied to change owner of database"))); memset(repl_null, ' ', sizeof(repl_null)); memset(repl_repl, ' ', sizeof(repl_repl)); repl_repl[Anum_pg_database_datdba - 1] = 'r'; - repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId); + repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId); /* * Determine the modified ACL for the new owner. This is only @@ -906,33 +1013,31 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId) if (!isNull) { newAcl = aclnewowner(DatumGetAclP(aclDatum), - datForm->datdba, newOwnerSysId); + datForm->datdba, newOwnerId); repl_repl[Anum_pg_database_datacl - 1] = 'r'; repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl); } - newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl); + newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl); simple_heap_update(rel, &newtuple->t_self, newtuple); CatalogUpdateIndexes(rel, newtuple); heap_freetuple(newtuple); - /* must release buffer pins before FlushRelationBuffers */ - systable_endscan(scan); - - /* - * Force dirty buffers out to disk, so that newly-connecting backends - * will see the altered row in pg_database right away. (They'll - * see an uncommitted tuple, but they don't care; see - * GetRawDatabaseInfo.) - */ - FlushRelationBuffers(rel, MaxBlockNumber); + /* Update owner dependency reference */ + changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple), + newOwnerId); } - else - systable_endscan(scan); - /* Close pg_database, but keep exclusive lock till commit */ + systable_endscan(scan); + + /* Close pg_database, but keep lock till commit */ heap_close(rel, NoLock); + + /* + * We don't bother updating the flat file since ALTER DATABASE OWNER + * doesn't affect it. + */ } @@ -940,88 +1045,149 @@ AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId) * Helper functions */ +/* + * Look up info about the database named "name". If the database exists, + * obtain the specified lock type on it, fill in any of the remaining + * parameters that aren't NULL, and return TRUE. If no such database, + * return FALSE. + */ static bool -get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP, - int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP, - TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, +get_db_info(const char *name, LOCKMODE lockmode, + Oid *dbIdP, Oid *ownerIdP, + int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, + Oid *dbLastSysOidP, + TransactionId *dbVacuumXidP, TransactionId *dbMinXidP, Oid *dbTablespace) { + bool result = false; Relation relation; - ScanKeyData scanKey; - SysScanDesc scan; - HeapTuple tuple; - bool gottuple; AssertArg(name); /* Caller may wish to grab a better lock on pg_database beforehand... */ - relation = heap_openr(DatabaseRelationName, AccessShareLock); + relation = heap_open(DatabaseRelationId, AccessShareLock); - ScanKeyInit(&scanKey, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(name)); + /* + * Loop covers the rare case where the database is renamed before we + * can lock it. We try again just in case we can find a new one of + * the same name. + */ + for (;;) + { + ScanKeyData scanKey; + SysScanDesc scan; + HeapTuple tuple; + Oid dbOid; - scan = systable_beginscan(relation, DatabaseNameIndex, true, - SnapshotNow, 1, &scanKey); + /* + * there's no syscache for database-indexed-by-name, + * so must do it the hard way + */ + ScanKeyInit(&scanKey, + Anum_pg_database_datname, + BTEqualStrategyNumber, F_NAMEEQ, + NameGetDatum(name)); - tuple = systable_getnext(scan); + scan = systable_beginscan(relation, DatabaseNameIndexId, true, + SnapshotNow, 1, &scanKey); - gottuple = HeapTupleIsValid(tuple); - if (gottuple) - { - Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); - - /* oid of the database */ - if (dbIdP) - *dbIdP = HeapTupleGetOid(tuple); - /* sysid of the owner */ - if (ownerIdP) - *ownerIdP = dbform->datdba; - /* character encoding */ - if (encodingP) - *encodingP = dbform->encoding; - /* allowed as template? */ - if (dbIsTemplateP) - *dbIsTemplateP = dbform->datistemplate; - /* last system OID used in database */ - if (dbLastSysOidP) - *dbLastSysOidP = dbform->datlastsysoid; - /* limit of vacuumed XIDs */ - if (dbVacuumXidP) - *dbVacuumXidP = dbform->datvacuumxid; - /* limit of frozen XIDs */ - if (dbFrozenXidP) - *dbFrozenXidP = dbform->datfrozenxid; - /* default tablespace for this database */ - if (dbTablespace) - *dbTablespace = dbform->dattablespace; + tuple = systable_getnext(scan); + + if (!HeapTupleIsValid(tuple)) + { + /* definitely no database of that name */ + systable_endscan(scan); + break; + } + + dbOid = HeapTupleGetOid(tuple); + + systable_endscan(scan); + + /* + * Now that we have a database OID, we can try to lock the DB. + */ + if (lockmode != NoLock) + LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode); + + /* + * And now, re-fetch the tuple by OID. If it's still there and + * still the same name, we win; else, drop the lock and loop + * back to try again. + */ + tuple = SearchSysCache(DATABASEOID, + ObjectIdGetDatum(dbOid), + 0, 0, 0); + if (HeapTupleIsValid(tuple)) + { + Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); + + if (strcmp(name, NameStr(dbform->datname)) == 0) + { + /* oid of the database */ + if (dbIdP) + *dbIdP = dbOid; + /* oid of the owner */ + if (ownerIdP) + *ownerIdP = dbform->datdba; + /* character encoding */ + if (encodingP) + *encodingP = dbform->encoding; + /* allowed as template? */ + if (dbIsTemplateP) + *dbIsTemplateP = dbform->datistemplate; + /* allowing connections? */ + if (dbAllowConnP) + *dbAllowConnP = dbform->datallowconn; + /* last system OID used in database */ + if (dbLastSysOidP) + *dbLastSysOidP = dbform->datlastsysoid; + /* limit of vacuumed XIDs */ + if (dbVacuumXidP) + *dbVacuumXidP = dbform->datvacuumxid; + /* limit of min XIDs */ + if (dbMinXidP) + *dbMinXidP = dbform->datminxid; + /* default tablespace for this database */ + if (dbTablespace) + *dbTablespace = dbform->dattablespace; + ReleaseSysCache(tuple); + result = true; + break; + } + /* can only get here if it was just renamed */ + ReleaseSysCache(tuple); + } + + if (lockmode != NoLock) + UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode); } - systable_endscan(scan); heap_close(relation, AccessShareLock); - return gottuple; + return result; } +/* Check if current user has createdb privileges */ static bool have_createdb_privilege(void) { + bool result = false; HeapTuple utup; - bool retval; - - utup = SearchSysCache(SHADOWSYSID, - Int32GetDatum(GetUserId()), - 0, 0, 0); - - if (!HeapTupleIsValid(utup)) - retval = false; - else - retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb; - ReleaseSysCache(utup); + /* Superusers can always do everything */ + if (superuser()) + return true; - return retval; + utup = SearchSysCache(AUTHOID, + ObjectIdGetDatum(GetUserId()), + 0, 0, 0); + if (HeapTupleIsValid(utup)) + { + result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb; + ReleaseSysCache(utup); + } + return result; } /* @@ -1037,7 +1203,7 @@ remove_dbtablespaces(Oid db_id) HeapScanDesc scan; HeapTuple tuple; - rel = heap_openr(TableSpaceRelationName, AccessShareLock); + rel = heap_open(TableSpaceRelationId, AccessShareLock); scan = heap_beginscan(rel, SnapshotNow, 0, NULL); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { @@ -1066,18 +1232,15 @@ remove_dbtablespaces(Oid db_id) /* Record the filesystem change in XLOG */ { xl_dbase_drop_rec xlrec; - XLogRecData rdata[2]; + XLogRecData rdata[1]; xlrec.db_id = db_id; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = offsetof(xl_dbase_drop_rec, dir_path); - rdata[0].next = &(rdata[1]); + xlrec.tablespace_id = dsttablespace; - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) dstpath; - rdata[1].len = strlen(dstpath) + 1; - rdata[1].next = NULL; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(xl_dbase_drop_rec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata); } @@ -1094,8 +1257,6 @@ remove_dbtablespaces(Oid db_id) * get_database_oid - given a database name, look up the OID * * Returns InvalidOid if database name not found. - * - * This is not actually used in this file, but is exported for use elsewhere. */ Oid get_database_oid(const char *dbname) @@ -1106,13 +1267,16 @@ get_database_oid(const char *dbname) HeapTuple dbtuple; Oid oid; - /* There's no syscache for pg_database, so must look the hard way */ - pg_database = heap_openr(DatabaseRelationName, AccessShareLock); + /* + * There's no syscache for pg_database indexed by name, + * so we must look the hard way. + */ + pg_database = heap_open(DatabaseRelationId, AccessShareLock); ScanKeyInit(&entry[0], Anum_pg_database_datname, BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(dbname)); - scan = systable_beginscan(pg_database, DatabaseNameIndex, true, + scan = systable_beginscan(pg_database, DatabaseNameIndexId, true, SnapshotNow, 1, entry); dbtuple = systable_getnext(scan); @@ -1134,38 +1298,24 @@ get_database_oid(const char *dbname) * get_database_name - given a database OID, look up the name * * Returns a palloc'd string, or NULL if no such database. - * - * This is not actually used in this file, but is exported for use elsewhere. */ char * get_database_name(Oid dbid) { - Relation pg_database; - ScanKeyData entry[1]; - SysScanDesc scan; HeapTuple dbtuple; char *result; - /* There's no syscache for pg_database, so must look the hard way */ - pg_database = heap_openr(DatabaseRelationName, AccessShareLock); - ScanKeyInit(&entry[0], - ObjectIdAttributeNumber, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(dbid)); - scan = systable_beginscan(pg_database, DatabaseOidIndex, true, - SnapshotNow, 1, entry); - - dbtuple = systable_getnext(scan); - - /* We assume that there can be at most one matching tuple */ + dbtuple = SearchSysCache(DATABASEOID, + ObjectIdGetDatum(dbid), + 0, 0, 0); if (HeapTupleIsValid(dbtuple)) + { result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname)); + ReleaseSysCache(dbtuple); + } else result = NULL; - systable_endscan(scan); - heap_close(pg_database, AccessShareLock); - return result; } @@ -1180,25 +1330,24 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record) if (info == XLOG_DBASE_CREATE) { xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record); - char *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1; + char *src_path; + char *dst_path; struct stat st; -#ifndef WIN32 - char buf[2 * MAXPGPATH + 100]; -#endif + src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id); + dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id); /* - * Our theory for replaying a CREATE is to forcibly drop the - * target subdirectory if present, then re-copy the source data. - * This may be more work than needed, but it is simple to - * implement. + * Our theory for replaying a CREATE is to forcibly drop the target + * subdirectory if present, then re-copy the source data. This may be + * more work than needed, but it is simple to implement. */ if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode)) { if (!rmtree(dst_path, true)) ereport(WARNING, - (errmsg("could not remove database directory \"%s\"", - dst_path))); + (errmsg("could not remove database directory \"%s\"", + dst_path))); } /* @@ -1206,80 +1355,61 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record) * up-to-date for the copy. (We really only need to flush buffers for * the source database, but bufmgr.c provides no API for that.) */ - BufferSync(-1, -1); - -#ifndef WIN32 + BufferSync(); /* * Copy this subdirectory to the new location * - * XXX use of cp really makes this code pretty grotty, particularly - * with respect to lack of ability to report errors well. Someday - * rewrite to do it for ourselves. + * We don't need to copy subdirectories */ - - /* We might need to use cp -R one day for portability */ - snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", - xlrec->src_path, dst_path); - if (system(buf) != 0) - ereport(ERROR, - (errmsg("could not initialize database directory"), - errdetail("Failing system command was: %s", buf), - errhint("Look in the postmaster's stderr log for more information."))); -#else /* WIN32 */ - if (copydir(xlrec->src_path, dst_path) != 0) - { - /* copydir should already have given details of its troubles */ - ereport(ERROR, - (errmsg("could not initialize database directory"))); - } -#endif /* WIN32 */ + copydir(src_path, dst_path, false); } else if (info == XLOG_DBASE_DROP) { xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record); + char *dst_path; - /* - * Drop pages for this database that are in the shared buffer - * cache - */ - DropBuffers(xlrec->db_id); + dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id); + + /* Drop pages for this database that are in the shared buffer cache */ + DropDatabaseBuffers(xlrec->db_id); + + /* Also, clean out any entries in the shared free space map */ + FreeSpaceMapForgetDatabase(xlrec->db_id); + + /* Clean out the xlog relcache too */ + XLogDropDatabase(xlrec->db_id); - if (!rmtree(xlrec->dir_path, true)) + /* And remove the physical files */ + if (!rmtree(dst_path, true)) ereport(WARNING, (errmsg("could not remove database directory \"%s\"", - xlrec->dir_path))); + dst_path))); } else elog(PANIC, "dbase_redo: unknown op code %u", info); } void -dbase_undo(XLogRecPtr lsn, XLogRecord *record) -{ - elog(PANIC, "dbase_undo: unimplemented"); -} - -void -dbase_desc(char *buf, uint8 xl_info, char *rec) +dbase_desc(StringInfo buf, uint8 xl_info, char *rec) { uint8 info = xl_info & ~XLR_INFO_MASK; if (info == XLOG_DBASE_CREATE) { xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec; - char *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1; - sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"", - xlrec->db_id, xlrec->src_path, dst_path); + appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u", + xlrec->src_db_id, xlrec->src_tablespace_id, + xlrec->db_id, xlrec->tablespace_id); } else if (info == XLOG_DBASE_DROP) { xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec; - sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"", - xlrec->db_id, xlrec->dir_path); + appendStringInfo(buf, "drop db: dir %u/%u", + xlrec->db_id, xlrec->tablespace_id); } else - strcat(buf, "UNKNOWN"); + appendStringInfo(buf, "UNKNOWN"); }