1 /*-------------------------------------------------------------------------
4 * Database management commands (create/drop database).
7 * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
12 * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.147 2004/11/18 01:14:26 tgl Exp $
14 *-------------------------------------------------------------------------
22 #include "access/genam.h"
23 #include "access/heapam.h"
24 #include "catalog/catname.h"
25 #include "catalog/catalog.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_shadow.h"
28 #include "catalog/pg_tablespace.h"
29 #include "catalog/indexing.h"
30 #include "commands/comment.h"
31 #include "commands/dbcommands.h"
32 #include "commands/tablespace.h"
33 #include "mb/pg_wchar.h"
34 #include "miscadmin.h"
35 #include "postmaster/bgwriter.h"
36 #include "storage/fd.h"
37 #include "storage/freespace.h"
38 #include "storage/sinval.h"
39 #include "utils/acl.h"
40 #include "utils/array.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/guc.h"
44 #include "utils/lsyscache.h"
45 #include "utils/syscache.h"
48 /* non-export function prototypes */
49 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
50 int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
51 TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
53 static bool have_createdb_privilege(void);
54 static void remove_dbtablespaces(Oid db_id);
61 createdb(const CreatedbStmt *stmt)
70 TransactionId src_vacuumxid;
71 TransactionId src_frozenxid;
72 Oid src_deftablespace;
73 Oid dst_deftablespace;
74 Relation pg_database_rel;
76 TupleDesc pg_database_dsc;
77 Datum new_record[Natts_pg_database];
78 char new_record_nulls[Natts_pg_database];
82 DefElem *dtablespacename = NULL;
83 DefElem *downer = NULL;
84 DefElem *dtemplate = NULL;
85 DefElem *dencoding = NULL;
86 char *dbname = stmt->dbname;
88 char *dbtemplate = NULL;
92 char buf[2 * MAXPGPATH + 100];
95 /* don't call this in a transaction block */
96 PreventTransactionChain((void *) stmt, "CREATE DATABASE");
98 /* Extract options from the statement node tree */
99 foreach(option, stmt->options)
101 DefElem *defel = (DefElem *) lfirst(option);
103 if (strcmp(defel->defname, "tablespace") == 0)
107 (errcode(ERRCODE_SYNTAX_ERROR),
108 errmsg("conflicting or redundant options")));
109 dtablespacename = defel;
111 else if (strcmp(defel->defname, "owner") == 0)
115 (errcode(ERRCODE_SYNTAX_ERROR),
116 errmsg("conflicting or redundant options")));
119 else if (strcmp(defel->defname, "template") == 0)
123 (errcode(ERRCODE_SYNTAX_ERROR),
124 errmsg("conflicting or redundant options")));
127 else if (strcmp(defel->defname, "encoding") == 0)
131 (errcode(ERRCODE_SYNTAX_ERROR),
132 errmsg("conflicting or redundant options")));
135 else if (strcmp(defel->defname, "location") == 0)
138 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
139 errmsg("LOCATION is not supported anymore"),
140 errhint("Consider using tablespaces instead.")));
143 elog(ERROR, "option \"%s\" not recognized",
147 if (downer && downer->arg)
148 dbowner = strVal(downer->arg);
149 if (dtemplate && dtemplate->arg)
150 dbtemplate = strVal(dtemplate->arg);
151 if (dencoding && dencoding->arg)
153 const char *encoding_name;
155 if (IsA(dencoding->arg, Integer))
157 encoding = intVal(dencoding->arg);
158 encoding_name = pg_encoding_to_char(encoding);
159 if (strcmp(encoding_name, "") == 0 ||
160 pg_valid_server_encoding(encoding_name) < 0)
162 (errcode(ERRCODE_UNDEFINED_OBJECT),
163 errmsg("%d is not a valid encoding code",
166 else if (IsA(dencoding->arg, String))
168 encoding_name = strVal(dencoding->arg);
169 if (pg_valid_server_encoding(encoding_name) < 0)
171 (errcode(ERRCODE_UNDEFINED_OBJECT),
172 errmsg("%s is not a valid encoding name",
174 encoding = pg_char_to_encoding(encoding_name);
177 elog(ERROR, "unrecognized node type: %d",
178 nodeTag(dencoding->arg));
181 /* obtain sysid of proposed owner */
183 datdba = get_usesysid(dbowner); /* will ereport if no such user */
185 datdba = GetUserId();
187 if (datdba == GetUserId())
189 /* creating database for self: can be superuser or createdb */
190 if (!superuser() && !have_createdb_privilege())
192 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
193 errmsg("permission denied to create database")));
197 /* creating database for someone else: must be superuser */
198 /* note that the someone else need not have any permissions */
201 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
202 errmsg("must be superuser to create database for another user")));
206 * Check for db name conflict. There is a race condition here, since
207 * another backend could create the same DB name before we commit.
208 * However, holding an exclusive lock on pg_database for the whole
209 * time we are copying the source database doesn't seem like a good
210 * idea, so accept possibility of race to create. We will check again
211 * after we grab the exclusive lock.
213 if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
215 (errcode(ERRCODE_DUPLICATE_DATABASE),
216 errmsg("database \"%s\" already exists", dbname)));
219 * Lookup database (template) to be cloned.
222 dbtemplate = "template1"; /* Default template database name */
224 if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
225 &src_istemplate, &src_lastsysoid,
226 &src_vacuumxid, &src_frozenxid, &src_deftablespace))
228 (errcode(ERRCODE_UNDEFINED_DATABASE),
229 errmsg("template database \"%s\" does not exist", dbtemplate)));
232 * Permission check: to copy a DB that's not marked datistemplate, you
233 * must be superuser or the owner thereof.
237 if (!superuser() && GetUserId() != src_owner)
239 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
240 errmsg("permission denied to copy database \"%s\"",
245 * The source DB can't have any active backends, except this one
246 * (exception is to allow CREATE DB while connected to template1).
247 * Otherwise we might copy inconsistent data. This check is not
248 * bulletproof, since someone might connect while we are copying...
250 if (DatabaseHasActiveBackends(src_dboid, true))
252 (errcode(ERRCODE_OBJECT_IN_USE),
253 errmsg("source database \"%s\" is being accessed by other users",
256 /* If encoding is defaulted, use source's encoding */
258 encoding = src_encoding;
260 /* Some encodings are client only */
261 if (!PG_VALID_BE_ENCODING(encoding))
263 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
264 errmsg("invalid server encoding %d", encoding)));
266 /* Resolve default tablespace for new database */
267 if (dtablespacename && dtablespacename->arg)
269 char *tablespacename;
272 tablespacename = strVal(dtablespacename->arg);
273 dst_deftablespace = get_tablespace_oid(tablespacename);
274 if (!OidIsValid(dst_deftablespace))
276 (errcode(ERRCODE_UNDEFINED_OBJECT),
277 errmsg("tablespace \"%s\" does not exist",
279 /* check permissions */
280 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
282 if (aclresult != ACLCHECK_OK)
283 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
287 * If we are trying to change the default tablespace of the template,
288 * we require that the template not have any files in the new default
289 * tablespace. This is necessary because otherwise the copied
290 * database would contain pg_class rows that refer to its default
291 * tablespace both explicitly (by OID) and implicitly (as zero), which
292 * would cause problems. For example another CREATE DATABASE using
293 * the copied database as template, and trying to change its default
294 * tablespace again, would yield outright incorrect results (it would
295 * improperly move tables to the new default tablespace that should
296 * stay in the same tablespace).
298 if (dst_deftablespace != src_deftablespace)
303 srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
305 if (stat(srcpath, &st) == 0 &&
306 S_ISDIR(st.st_mode) &&
307 !directory_is_empty(srcpath))
309 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
310 errmsg("cannot assign new default tablespace \"%s\"",
312 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
319 /* Use template database's default tablespace */
320 dst_deftablespace = src_deftablespace;
321 /* Note there is no additional permission check in this path */
325 * Preassign OID for pg_database tuple, so that we can compute db
331 * Force dirty buffers out to disk, to ensure source database is
332 * up-to-date for the copy. (We really only need to flush buffers for
333 * the source database, but bufmgr.c provides no API for that.)
338 * Close virtual file descriptors so the kernel has more available for
339 * the system() calls below.
344 * Iterate through all tablespaces of the template database, and copy
345 * each one to the new database.
347 rel = heap_openr(TableSpaceRelationName, AccessShareLock);
348 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
349 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
351 Oid srctablespace = HeapTupleGetOid(tuple);
357 /* No need to copy global tablespace */
358 if (srctablespace == GLOBALTABLESPACE_OID)
361 srcpath = GetDatabasePath(src_dboid, srctablespace);
363 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
364 directory_is_empty(srcpath))
366 /* Assume we can ignore it */
371 if (srctablespace == src_deftablespace)
372 dsttablespace = dst_deftablespace;
374 dsttablespace = srctablespace;
376 dstpath = GetDatabasePath(dboid, dsttablespace);
378 if (stat(dstpath, &st) == 0 || errno != ENOENT)
380 remove_dbtablespaces(dboid);
382 (errmsg("could not initialize database directory"),
383 errdetail("Directory \"%s\" already exists.",
390 * Copy this subdirectory to the new location
392 * XXX use of cp really makes this code pretty grotty, particularly
393 * with respect to lack of ability to report errors well. Someday
394 * rewrite to do it for ourselves.
397 /* We might need to use cp -R one day for portability */
398 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
400 if (system(buf) != 0)
402 remove_dbtablespaces(dboid);
404 (errmsg("could not initialize database directory"),
405 errdetail("Failing system command was: %s", buf),
406 errhint("Look in the postmaster's stderr log for more information.")));
409 if (copydir(srcpath, dstpath) != 0)
411 /* copydir should already have given details of its troubles */
412 remove_dbtablespaces(dboid);
414 (errmsg("could not initialize database directory")));
418 /* Record the filesystem change in XLOG */
420 xl_dbase_create_rec xlrec;
421 XLogRecData rdata[3];
424 rdata[0].buffer = InvalidBuffer;
425 rdata[0].data = (char *) &xlrec;
426 rdata[0].len = offsetof(xl_dbase_create_rec, src_path);
427 rdata[0].next = &(rdata[1]);
429 rdata[1].buffer = InvalidBuffer;
430 rdata[1].data = (char *) srcpath;
431 rdata[1].len = strlen(srcpath) + 1;
432 rdata[1].next = &(rdata[2]);
434 rdata[2].buffer = InvalidBuffer;
435 rdata[2].data = (char *) dstpath;
436 rdata[2].len = strlen(dstpath) + 1;
437 rdata[2].next = NULL;
439 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
443 heap_close(rel, AccessShareLock);
446 * Now OK to grab exclusive lock on pg_database.
448 pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
450 /* Check to see if someone else created same DB name meanwhile. */
451 if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
453 /* Don't hold lock while doing recursive remove */
454 heap_close(pg_database_rel, AccessExclusiveLock);
455 remove_dbtablespaces(dboid);
457 (errcode(ERRCODE_DUPLICATE_DATABASE),
458 errmsg("database \"%s\" already exists", dbname)));
462 * Insert a new tuple into pg_database
464 pg_database_dsc = RelationGetDescr(pg_database_rel);
467 MemSet(new_record, 0, sizeof(new_record));
468 MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
470 new_record[Anum_pg_database_datname - 1] =
471 DirectFunctionCall1(namein, CStringGetDatum(dbname));
472 new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
473 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
474 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
475 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
476 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
477 new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
478 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
479 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
482 * We deliberately set datconfig and datacl to defaults (NULL), rather
483 * than copying them from the template database. Copying datacl would
484 * be a bad idea when the owner is not the same as the template's
485 * owner. It's more debatable whether datconfig should be copied.
487 new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
488 new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
490 tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
492 HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID
495 simple_heap_insert(pg_database_rel, tuple);
498 CatalogUpdateIndexes(pg_database_rel, tuple);
501 * Force dirty buffers out to disk, so that newly-connecting backends
502 * will see the new database in pg_database right away. (They'll see
503 * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
505 FlushRelationBuffers(pg_database_rel, MaxBlockNumber);
507 /* Close pg_database, but keep exclusive lock till commit */
508 heap_close(pg_database_rel, NoLock);
516 dropdb(const char *dbname)
522 SysScanDesc pgdbscan;
526 PreventTransactionChain((void *) dbname, "DROP DATABASE");
530 if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
532 (errcode(ERRCODE_OBJECT_IN_USE),
533 errmsg("cannot drop the currently open database")));
536 * Obtain exclusive lock on pg_database. We need this to ensure that
537 * no new backend starts up in the target database while we are
538 * deleting it. (Actually, a new backend might still manage to start
539 * up, because it will read pg_database without any locking to
540 * discover the database's OID. But it will detect its error in
541 * ReverifyMyDatabase and shut down before any serious damage is done.
544 pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
546 if (!get_db_info(dbname, &db_id, &db_owner, NULL,
547 &db_istemplate, NULL, NULL, NULL, NULL))
549 (errcode(ERRCODE_UNDEFINED_DATABASE),
550 errmsg("database \"%s\" does not exist", dbname)));
552 if (GetUserId() != db_owner && !superuser())
553 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
557 * Disallow dropping a DB that is marked istemplate. This is just to
558 * prevent people from accidentally dropping template0 or template1;
559 * they can do so if they're really determined ...
563 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
564 errmsg("cannot drop a template database")));
567 * Check for active backends in the target database.
569 if (DatabaseHasActiveBackends(db_id, false))
571 (errcode(ERRCODE_OBJECT_IN_USE),
572 errmsg("database \"%s\" is being accessed by other users",
576 * Find the database's tuple by OID (should be unique).
579 ObjectIdAttributeNumber,
580 BTEqualStrategyNumber, F_OIDEQ,
581 ObjectIdGetDatum(db_id));
583 pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
584 SnapshotNow, 1, &key);
586 tup = systable_getnext(pgdbscan);
587 if (!HeapTupleIsValid(tup))
590 * This error should never come up since the existence of the
591 * database is checked earlier
593 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
597 /* Remove the database's tuple from pg_database */
598 simple_heap_delete(pgdbrel, &tup->t_self);
600 systable_endscan(pgdbscan);
603 * Delete any comments associated with the database
605 * NOTE: this is probably dead code since any such comments should have
606 * been in that database, not mine.
608 DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
611 * Drop pages for this database that are in the shared buffer cache.
612 * This is important to ensure that no remaining backend tries to
613 * write out a dirty buffer to the dead database later...
618 * Also, clean out any entries in the shared free space map.
620 FreeSpaceMapForgetDatabase(db_id);
623 * On Windows, force a checkpoint so that the bgwriter doesn't hold any
624 * open files, which would cause rmdir() to fail.
627 RequestCheckpoint(true);
631 * Remove all tablespace subdirs belonging to the database.
633 remove_dbtablespaces(db_id);
636 * Force dirty buffers out to disk, so that newly-connecting backends
637 * will see the database tuple marked dead in pg_database right away.
638 * (They'll see an uncommitted deletion, but they don't care; see
639 * GetRawDatabaseInfo.)
641 FlushRelationBuffers(pgdbrel, MaxBlockNumber);
643 /* Close pg_database, but keep exclusive lock till commit */
644 heap_close(pgdbrel, NoLock);
652 RenameDatabase(const char *oldname, const char *newname)
663 * Obtain AccessExclusiveLock so that no new session gets started
664 * while the rename is in progress.
666 rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
669 Anum_pg_database_datname,
670 BTEqualStrategyNumber, F_NAMEEQ,
671 NameGetDatum(oldname));
672 scan = systable_beginscan(rel, DatabaseNameIndex, true,
673 SnapshotNow, 1, &key);
675 tup = systable_getnext(scan);
676 if (!HeapTupleIsValid(tup))
678 (errcode(ERRCODE_UNDEFINED_DATABASE),
679 errmsg("database \"%s\" does not exist", oldname)));
682 * XXX Client applications probably store the current database
683 * somewhere, so renaming it could cause confusion. On the other
684 * hand, there may not be an actual problem besides a little
685 * confusion, so think about this and decide.
687 if (HeapTupleGetOid(tup) == MyDatabaseId)
689 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
690 errmsg("current database may not be renamed")));
693 * Make sure the database does not have active sessions. Might not be
694 * necessary, but it's consistent with other database operations.
696 if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
698 (errcode(ERRCODE_OBJECT_IN_USE),
699 errmsg("database \"%s\" is being accessed by other users",
702 /* make sure the new name doesn't exist */
704 Anum_pg_database_datname,
705 BTEqualStrategyNumber, F_NAMEEQ,
706 NameGetDatum(newname));
707 scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
708 SnapshotNow, 1, &key2);
709 if (HeapTupleIsValid(systable_getnext(scan2)))
711 (errcode(ERRCODE_DUPLICATE_DATABASE),
712 errmsg("database \"%s\" already exists", newname)));
713 systable_endscan(scan2);
716 if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
717 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
720 /* must have createdb */
721 if (!have_createdb_privilege())
723 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
724 errmsg("permission denied to rename database")));
727 newtup = heap_copytuple(tup);
728 namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
729 simple_heap_update(rel, &newtup->t_self, newtup);
730 CatalogUpdateIndexes(rel, newtup);
732 systable_endscan(scan);
735 * Force dirty buffers out to disk, so that newly-connecting backends
736 * will see the renamed database in pg_database right away. (They'll
737 * see an uncommitted tuple, but they don't care; see
738 * GetRawDatabaseInfo.)
740 FlushRelationBuffers(rel, MaxBlockNumber);
742 /* Close pg_database, but keep exclusive lock till commit */
743 heap_close(rel, NoLock);
748 * ALTER DATABASE name SET ...
751 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
759 Datum repl_val[Natts_pg_database];
760 char repl_null[Natts_pg_database];
761 char repl_repl[Natts_pg_database];
763 valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
766 * We need AccessExclusiveLock so we can safely do FlushRelationBuffers.
768 rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
769 ScanKeyInit(&scankey,
770 Anum_pg_database_datname,
771 BTEqualStrategyNumber, F_NAMEEQ,
772 NameGetDatum(stmt->dbname));
773 scan = systable_beginscan(rel, DatabaseNameIndex, true,
774 SnapshotNow, 1, &scankey);
775 tuple = systable_getnext(scan);
776 if (!HeapTupleIsValid(tuple))
778 (errcode(ERRCODE_UNDEFINED_DATABASE),
779 errmsg("database \"%s\" does not exist", stmt->dbname)));
782 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
783 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
786 MemSet(repl_repl, ' ', sizeof(repl_repl));
787 repl_repl[Anum_pg_database_datconfig - 1] = 'r';
789 if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
792 repl_null[Anum_pg_database_datconfig - 1] = 'n';
793 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
801 repl_null[Anum_pg_database_datconfig - 1] = ' ';
803 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
804 RelationGetDescr(rel), &isnull);
806 a = isnull ? NULL : DatumGetArrayTypeP(datum);
809 a = GUCArrayAdd(a, stmt->variable, valuestr);
811 a = GUCArrayDelete(a, stmt->variable);
814 repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
816 repl_null[Anum_pg_database_datconfig - 1] = 'n';
819 newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
820 simple_heap_update(rel, &tuple->t_self, newtuple);
823 CatalogUpdateIndexes(rel, newtuple);
825 systable_endscan(scan);
828 * Force dirty buffers out to disk, so that newly-connecting backends
829 * will see the altered row in pg_database right away. (They'll
830 * see an uncommitted tuple, but they don't care; see
831 * GetRawDatabaseInfo.)
833 FlushRelationBuffers(rel, MaxBlockNumber);
835 /* Close pg_database, but keep exclusive lock till commit */
836 heap_close(rel, NoLock);
841 * ALTER DATABASE name OWNER TO newowner
844 AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
850 Form_pg_database datForm;
853 * We need AccessExclusiveLock so we can safely do FlushRelationBuffers.
855 rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
856 ScanKeyInit(&scankey,
857 Anum_pg_database_datname,
858 BTEqualStrategyNumber, F_NAMEEQ,
859 NameGetDatum(dbname));
860 scan = systable_beginscan(rel, DatabaseNameIndex, true,
861 SnapshotNow, 1, &scankey);
862 tuple = systable_getnext(scan);
863 if (!HeapTupleIsValid(tuple))
865 (errcode(ERRCODE_UNDEFINED_DATABASE),
866 errmsg("database \"%s\" does not exist", dbname)));
868 datForm = (Form_pg_database) GETSTRUCT(tuple);
871 * If the new owner is the same as the existing owner, consider the
872 * command to have succeeded. This is to be consistent with other
875 if (datForm->datdba != newOwnerSysId)
877 Datum repl_val[Natts_pg_database];
878 char repl_null[Natts_pg_database];
879 char repl_repl[Natts_pg_database];
885 /* changing owner's database for someone else: must be superuser */
886 /* note that the someone else need not have any permissions */
889 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
890 errmsg("must be superuser to change owner")));
892 memset(repl_null, ' ', sizeof(repl_null));
893 memset(repl_repl, ' ', sizeof(repl_repl));
895 repl_repl[Anum_pg_database_datdba - 1] = 'r';
896 repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
899 * Determine the modified ACL for the new owner. This is only
900 * necessary when the ACL is non-null.
902 aclDatum = heap_getattr(tuple,
903 Anum_pg_database_datacl,
904 RelationGetDescr(rel),
908 newAcl = aclnewowner(DatumGetAclP(aclDatum),
909 datForm->datdba, newOwnerSysId);
910 repl_repl[Anum_pg_database_datacl - 1] = 'r';
911 repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
914 newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
915 simple_heap_update(rel, &newtuple->t_self, newtuple);
916 CatalogUpdateIndexes(rel, newtuple);
918 heap_freetuple(newtuple);
920 /* must release buffer pins before FlushRelationBuffers */
921 systable_endscan(scan);
924 * Force dirty buffers out to disk, so that newly-connecting backends
925 * will see the altered row in pg_database right away. (They'll
926 * see an uncommitted tuple, but they don't care; see
927 * GetRawDatabaseInfo.)
929 FlushRelationBuffers(rel, MaxBlockNumber);
932 systable_endscan(scan);
934 /* Close pg_database, but keep exclusive lock till commit */
935 heap_close(rel, NoLock);
944 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
945 int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
946 TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
957 /* Caller may wish to grab a better lock on pg_database beforehand... */
958 relation = heap_openr(DatabaseRelationName, AccessShareLock);
960 ScanKeyInit(&scanKey,
961 Anum_pg_database_datname,
962 BTEqualStrategyNumber, F_NAMEEQ,
965 scan = systable_beginscan(relation, DatabaseNameIndex, true,
966 SnapshotNow, 1, &scanKey);
968 tuple = systable_getnext(scan);
970 gottuple = HeapTupleIsValid(tuple);
973 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
975 /* oid of the database */
977 *dbIdP = HeapTupleGetOid(tuple);
978 /* sysid of the owner */
980 *ownerIdP = dbform->datdba;
981 /* character encoding */
983 *encodingP = dbform->encoding;
984 /* allowed as template? */
986 *dbIsTemplateP = dbform->datistemplate;
987 /* last system OID used in database */
989 *dbLastSysOidP = dbform->datlastsysoid;
990 /* limit of vacuumed XIDs */
992 *dbVacuumXidP = dbform->datvacuumxid;
993 /* limit of frozen XIDs */
995 *dbFrozenXidP = dbform->datfrozenxid;
996 /* default tablespace for this database */
998 *dbTablespace = dbform->dattablespace;
1001 systable_endscan(scan);
1002 heap_close(relation, AccessShareLock);
1008 have_createdb_privilege(void)
1013 utup = SearchSysCache(SHADOWSYSID,
1014 Int32GetDatum(GetUserId()),
1017 if (!HeapTupleIsValid(utup))
1020 retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
1022 ReleaseSysCache(utup);
1028 * Remove tablespace directories
1030 * We don't know what tablespaces db_id is using, so iterate through all
1031 * tablespaces removing <tablespace>/db_id
1034 remove_dbtablespaces(Oid db_id)
1040 rel = heap_openr(TableSpaceRelationName, AccessShareLock);
1041 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1042 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1044 Oid dsttablespace = HeapTupleGetOid(tuple);
1048 /* Don't mess with the global tablespace */
1049 if (dsttablespace == GLOBALTABLESPACE_OID)
1052 dstpath = GetDatabasePath(db_id, dsttablespace);
1054 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1056 /* Assume we can ignore it */
1061 if (!rmtree(dstpath, true))
1063 (errmsg("could not remove database directory \"%s\"",
1066 /* Record the filesystem change in XLOG */
1068 xl_dbase_drop_rec xlrec;
1069 XLogRecData rdata[2];
1071 xlrec.db_id = db_id;
1072 rdata[0].buffer = InvalidBuffer;
1073 rdata[0].data = (char *) &xlrec;
1074 rdata[0].len = offsetof(xl_dbase_drop_rec, dir_path);
1075 rdata[0].next = &(rdata[1]);
1077 rdata[1].buffer = InvalidBuffer;
1078 rdata[1].data = (char *) dstpath;
1079 rdata[1].len = strlen(dstpath) + 1;
1080 rdata[1].next = NULL;
1082 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1089 heap_close(rel, AccessShareLock);
1094 * get_database_oid - given a database name, look up the OID
1096 * Returns InvalidOid if database name not found.
1098 * This is not actually used in this file, but is exported for use elsewhere.
1101 get_database_oid(const char *dbname)
1103 Relation pg_database;
1104 ScanKeyData entry[1];
1109 /* There's no syscache for pg_database, so must look the hard way */
1110 pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1111 ScanKeyInit(&entry[0],
1112 Anum_pg_database_datname,
1113 BTEqualStrategyNumber, F_NAMEEQ,
1114 CStringGetDatum(dbname));
1115 scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
1116 SnapshotNow, 1, entry);
1118 dbtuple = systable_getnext(scan);
1120 /* We assume that there can be at most one matching tuple */
1121 if (HeapTupleIsValid(dbtuple))
1122 oid = HeapTupleGetOid(dbtuple);
1126 systable_endscan(scan);
1127 heap_close(pg_database, AccessShareLock);
1134 * get_database_name - given a database OID, look up the name
1136 * Returns a palloc'd string, or NULL if no such database.
1138 * This is not actually used in this file, but is exported for use elsewhere.
1141 get_database_name(Oid dbid)
1143 Relation pg_database;
1144 ScanKeyData entry[1];
1149 /* There's no syscache for pg_database, so must look the hard way */
1150 pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1151 ScanKeyInit(&entry[0],
1152 ObjectIdAttributeNumber,
1153 BTEqualStrategyNumber, F_OIDEQ,
1154 ObjectIdGetDatum(dbid));
1155 scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
1156 SnapshotNow, 1, entry);
1158 dbtuple = systable_getnext(scan);
1160 /* We assume that there can be at most one matching tuple */
1161 if (HeapTupleIsValid(dbtuple))
1162 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1166 systable_endscan(scan);
1167 heap_close(pg_database, AccessShareLock);
1173 * DATABASE resource manager's routines
1176 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1178 uint8 info = record->xl_info & ~XLR_INFO_MASK;
1180 if (info == XLOG_DBASE_CREATE)
1182 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1183 char *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1187 char buf[2 * MAXPGPATH + 100];
1191 * Our theory for replaying a CREATE is to forcibly drop the
1192 * target subdirectory if present, then re-copy the source data.
1193 * This may be more work than needed, but it is simple to
1196 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1198 if (!rmtree(dst_path, true))
1200 (errmsg("could not remove database directory \"%s\"",
1205 * Force dirty buffers out to disk, to ensure source database is
1206 * up-to-date for the copy. (We really only need to flush buffers for
1207 * the source database, but bufmgr.c provides no API for that.)
1214 * Copy this subdirectory to the new location
1216 * XXX use of cp really makes this code pretty grotty, particularly
1217 * with respect to lack of ability to report errors well. Someday
1218 * rewrite to do it for ourselves.
1221 /* We might need to use cp -R one day for portability */
1222 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1223 xlrec->src_path, dst_path);
1224 if (system(buf) != 0)
1226 (errmsg("could not initialize database directory"),
1227 errdetail("Failing system command was: %s", buf),
1228 errhint("Look in the postmaster's stderr log for more information.")));
1230 if (copydir(xlrec->src_path, dst_path) != 0)
1232 /* copydir should already have given details of its troubles */
1234 (errmsg("could not initialize database directory")));
1238 else if (info == XLOG_DBASE_DROP)
1240 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1243 * Drop pages for this database that are in the shared buffer
1246 DropBuffers(xlrec->db_id);
1248 if (!rmtree(xlrec->dir_path, true))
1250 (errmsg("could not remove database directory \"%s\"",
1254 elog(PANIC, "dbase_redo: unknown op code %u", info);
1258 dbase_undo(XLogRecPtr lsn, XLogRecord *record)
1260 elog(PANIC, "dbase_undo: unimplemented");
1264 dbase_desc(char *buf, uint8 xl_info, char *rec)
1266 uint8 info = xl_info & ~XLR_INFO_MASK;
1268 if (info == XLOG_DBASE_CREATE)
1270 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1271 char *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1273 sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
1274 xlrec->db_id, xlrec->src_path, dst_path);
1276 else if (info == XLOG_DBASE_DROP)
1278 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1280 sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
1281 xlrec->db_id, xlrec->dir_path);
1284 strcat(buf, "UNKNOWN");