]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
1c1907c5e453d5c3bf2903430fb30d31c9ba6814
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  *
7  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.139 2004/08/01 20:30:48 tgl Exp $
13  *
14  *-------------------------------------------------------------------------
15  */
16 #include "postgres.h"
17
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21
22 #include "access/genam.h"
23 #include "access/heapam.h"
24 #include "catalog/catname.h"
25 #include "catalog/catalog.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_shadow.h"
28 #include "catalog/pg_tablespace.h"
29 #include "catalog/indexing.h"
30 #include "commands/comment.h"
31 #include "commands/dbcommands.h"
32 #include "commands/tablespace.h"
33 #include "mb/pg_wchar.h"
34 #include "miscadmin.h"
35 #include "storage/fd.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "utils/acl.h"
39 #include "utils/array.h"
40 #include "utils/builtins.h"
41 #include "utils/fmgroids.h"
42 #include "utils/guc.h"
43 #include "utils/lsyscache.h"
44 #include "utils/syscache.h"
45
46
47 /* non-export function prototypes */
48 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
49                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
50                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
51                         Oid *dbTablespace);
52 static bool have_createdb_privilege(void);
53 static void remove_dbtablespaces(Oid db_id);
54
55
56 /*
57  * CREATE DATABASE
58  */
59 void
60 createdb(const CreatedbStmt *stmt)
61 {
62         HeapScanDesc scan;
63         Relation        rel;
64         Oid                     src_dboid;
65         AclId           src_owner;
66         int                     src_encoding;
67         bool            src_istemplate;
68         Oid                     src_lastsysoid;
69         TransactionId src_vacuumxid;
70         TransactionId src_frozenxid;
71         Oid                     src_deftablespace;
72         Oid                     dst_deftablespace;
73         Relation        pg_database_rel;
74         HeapTuple       tuple;
75         TupleDesc       pg_database_dsc;
76         Datum           new_record[Natts_pg_database];
77         char            new_record_nulls[Natts_pg_database];
78         Oid                     dboid;
79         AclId           datdba;
80         ListCell   *option;
81         DefElem    *dtablespacename = NULL;
82         DefElem    *downer = NULL;
83         DefElem    *dtemplate = NULL;
84         DefElem    *dencoding = NULL;
85         char       *dbname = stmt->dbname;
86         char       *dbowner = NULL;
87         char       *dbtemplate = NULL;
88         int                     encoding = -1;
89 #ifndef WIN32
90         char            buf[2 * MAXPGPATH + 100];
91 #endif
92
93         /* don't call this in a transaction block */
94         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
95
96         /* Extract options from the statement node tree */
97         foreach(option, stmt->options)
98         {
99                 DefElem    *defel = (DefElem *) lfirst(option);
100
101                 if (strcmp(defel->defname, "tablespace") == 0)
102                 {
103                         if (dtablespacename)
104                                 ereport(ERROR,
105                                                 (errcode(ERRCODE_SYNTAX_ERROR),
106                                                  errmsg("conflicting or redundant options")));
107                         dtablespacename = defel;
108                 }
109                 else if (strcmp(defel->defname, "owner") == 0)
110                 {
111                         if (downer)
112                                 ereport(ERROR,
113                                                 (errcode(ERRCODE_SYNTAX_ERROR),
114                                                  errmsg("conflicting or redundant options")));
115                         downer = defel;
116                 }
117                 else if (strcmp(defel->defname, "template") == 0)
118                 {
119                         if (dtemplate)
120                                 ereport(ERROR,
121                                                 (errcode(ERRCODE_SYNTAX_ERROR),
122                                                  errmsg("conflicting or redundant options")));
123                         dtemplate = defel;
124                 }
125                 else if (strcmp(defel->defname, "encoding") == 0)
126                 {
127                         if (dencoding)
128                                 ereport(ERROR,
129                                                 (errcode(ERRCODE_SYNTAX_ERROR),
130                                                  errmsg("conflicting or redundant options")));
131                         dencoding = defel;
132                 }
133                 else if (strcmp(defel->defname, "location") == 0)
134                 {
135                         ereport(WARNING,
136                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
137                                          errmsg("LOCATION is not supported anymore"),
138                                          errhint("Consider using tablespaces instead.")));
139                 }
140                 else
141                         elog(ERROR, "option \"%s\" not recognized",
142                                  defel->defname);
143         }
144
145         if (downer && downer->arg)
146                 dbowner = strVal(downer->arg);
147         if (dtemplate && dtemplate->arg)
148                 dbtemplate = strVal(dtemplate->arg);
149         if (dencoding && dencoding->arg)
150         {
151                 const char *encoding_name;
152
153                 if (IsA(dencoding->arg, Integer))
154                 {
155                         encoding = intVal(dencoding->arg);
156                         encoding_name = pg_encoding_to_char(encoding);
157                         if (strcmp(encoding_name, "") == 0 ||
158                                 pg_valid_server_encoding(encoding_name) < 0)
159                                 ereport(ERROR,
160                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
161                                                  errmsg("%d is not a valid encoding code",
162                                                                 encoding)));
163                 }
164                 else if (IsA(dencoding->arg, String))
165                 {
166                         encoding_name = strVal(dencoding->arg);
167                         if (pg_valid_server_encoding(encoding_name) < 0)
168                                 ereport(ERROR,
169                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
170                                                  errmsg("%s is not a valid encoding name",
171                                                                 encoding_name)));
172                         encoding = pg_char_to_encoding(encoding_name);
173                 }
174                 else
175                         elog(ERROR, "unrecognized node type: %d",
176                                  nodeTag(dencoding->arg));
177         }
178
179         /* obtain sysid of proposed owner */
180         if (dbowner)
181                 datdba = get_usesysid(dbowner); /* will ereport if no such user */
182         else
183                 datdba = GetUserId();
184
185         if (datdba == GetUserId())
186         {
187                 /* creating database for self: can be superuser or createdb */
188                 if (!superuser() && !have_createdb_privilege())
189                         ereport(ERROR,
190                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
191                                          errmsg("permission denied to create database")));
192         }
193         else
194         {
195                 /* creating database for someone else: must be superuser */
196                 /* note that the someone else need not have any permissions */
197                 if (!superuser())
198                         ereport(ERROR,
199                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
200                                          errmsg("must be superuser to create database for another user")));
201         }
202
203         /*
204          * Check for db name conflict.  There is a race condition here, since
205          * another backend could create the same DB name before we commit.
206          * However, holding an exclusive lock on pg_database for the whole
207          * time we are copying the source database doesn't seem like a good
208          * idea, so accept possibility of race to create.  We will check again
209          * after we grab the exclusive lock.
210          */
211         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
212                 ereport(ERROR,
213                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
214                                  errmsg("database \"%s\" already exists", dbname)));
215
216         /*
217          * Lookup database (template) to be cloned.
218          */
219         if (!dbtemplate)
220                 dbtemplate = "template1";               /* Default template database name */
221
222         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
223                                          &src_istemplate, &src_lastsysoid,
224                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
225                 ereport(ERROR,
226                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
227                                  errmsg("template database \"%s\" does not exist", dbtemplate)));
228
229         /*
230          * Permission check: to copy a DB that's not marked datistemplate, you
231          * must be superuser or the owner thereof.
232          */
233         if (!src_istemplate)
234         {
235                 if (!superuser() && GetUserId() != src_owner)
236                         ereport(ERROR,
237                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
238                                          errmsg("permission denied to copy database \"%s\"",
239                                                         dbtemplate)));
240         }
241
242         /*
243          * The source DB can't have any active backends, except this one
244          * (exception is to allow CREATE DB while connected to template1).
245          * Otherwise we might copy inconsistent data.  This check is not
246          * bulletproof, since someone might connect while we are copying...
247          */
248         if (DatabaseHasActiveBackends(src_dboid, true))
249                 ereport(ERROR,
250                                 (errcode(ERRCODE_OBJECT_IN_USE),
251                 errmsg("source database \"%s\" is being accessed by other users",
252                            dbtemplate)));
253
254         /* If encoding is defaulted, use source's encoding */
255         if (encoding < 0)
256                 encoding = src_encoding;
257
258         /* Some encodings are client only */
259         if (!PG_VALID_BE_ENCODING(encoding))
260                 ereport(ERROR,
261                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
262                                  errmsg("invalid server encoding %d", encoding)));
263
264         /* Resolve default tablespace for new database */
265         if (dtablespacename && dtablespacename->arg)
266         {
267                 char       *tablespacename;
268         AclResult   aclresult;
269
270                 tablespacename = strVal(dtablespacename->arg);
271                 dst_deftablespace = get_tablespace_oid(tablespacename);
272                 if (!OidIsValid(dst_deftablespace))
273                         ereport(ERROR,
274                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
275                                          errmsg("tablespace \"%s\" does not exist",
276                                                         tablespacename)));
277                 /* check permissions */
278         aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
279                                                                                    ACL_CREATE);
280         if (aclresult != ACLCHECK_OK)
281             aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
282                            tablespacename);
283         }
284         else
285         {
286                 /* Use template database's default tablespace */
287                 dst_deftablespace = src_deftablespace;
288                 /* Note there is no additional permission check in this path */
289         }
290
291         /*
292          * Preassign OID for pg_database tuple, so that we can compute db
293          * path.
294          */
295         dboid = newoid();
296
297         /*
298          * Force dirty buffers out to disk, to ensure source database is
299          * up-to-date for the copy.  (We really only need to flush buffers for
300          * the source database...)
301          */
302         BufferSync(-1, -1);
303
304         /*
305          * Close virtual file descriptors so the kernel has more available for
306          * the system() calls below.
307          */
308         closeAllVfds();
309
310         /*
311          * Iterate through all tablespaces of the template database, and
312          * copy each one to the new database.
313          *
314          * If we are trying to change the default tablespace of the template,
315          * we require that the template not have any files in the new default
316          * tablespace.  This avoids the need to merge two subdirectories.
317          * This could probably be improved later.
318          */
319         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
320         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
321         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
322         {
323                 Oid             srctablespace = HeapTupleGetOid(tuple);
324                 Oid             dsttablespace;
325                 char   *srcpath;
326                 char   *dstpath;
327                 struct stat st;
328
329                 /* No need to copy global tablespace */
330                 if (srctablespace == GLOBALTABLESPACE_OID)
331                         continue;
332
333                 srcpath = GetDatabasePath(src_dboid, srctablespace);
334
335                 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode))
336                 {
337                         /* Assume we can ignore it */
338                         pfree(srcpath);
339                         continue;
340                 }
341
342                 if (srctablespace == src_deftablespace)
343                         dsttablespace = dst_deftablespace;
344                 else
345                         dsttablespace = srctablespace;
346
347                 dstpath = GetDatabasePath(dboid, dsttablespace);
348
349                 if (stat(dstpath, &st) == 0 || errno != ENOENT)
350                 {
351                         remove_dbtablespaces(dboid);
352                         ereport(ERROR,
353                                         (errmsg("could not initialize database directory"),
354                                          errdetail("Directory \"%s\" already exists.", dstpath)));
355                 }
356
357 #ifndef WIN32
358                 /*
359                  * Copy this subdirectory to the new location
360                  *
361                  * XXX use of cp really makes this code pretty grotty, particularly
362                  * with respect to lack of ability to report errors well.  Someday
363                  * rewrite to do it for ourselves.
364                  */
365
366                 /* We might need to use cp -R one day for portability */
367                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
368                                  srcpath, dstpath);
369                 if (system(buf) != 0)
370                 {
371                         remove_dbtablespaces(dboid);
372                         ereport(ERROR,
373                                         (errmsg("could not initialize database directory"),
374                                          errdetail("Failing system command was: %s", buf),
375                                          errhint("Look in the postmaster's stderr log for more information.")));
376                 }
377 #else   /* WIN32 */
378                 if (copydir(srcpath, dstpath) != 0)
379                 {
380                         /* copydir should already have given details of its troubles */
381                         remove_dbtablespaces(dboid);
382                         ereport(ERROR,
383                                         (errmsg("could not initialize database directory")));
384                 }
385 #endif  /* WIN32 */
386         }
387         heap_endscan(scan);
388         heap_close(rel, AccessShareLock);
389
390         /*
391          * Now OK to grab exclusive lock on pg_database.
392          */
393         pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
394
395         /* Check to see if someone else created same DB name meanwhile. */
396         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
397         {
398                 /* Don't hold lock while doing recursive remove */
399                 heap_close(pg_database_rel, AccessExclusiveLock);
400                 remove_dbtablespaces(dboid);
401                 ereport(ERROR,
402                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
403                                  errmsg("database \"%s\" already exists", dbname)));
404         }
405
406         /*
407          * Insert a new tuple into pg_database
408          */
409         pg_database_dsc = RelationGetDescr(pg_database_rel);
410
411         /* Form tuple */
412         MemSet(new_record, 0, sizeof(new_record));
413         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
414
415         new_record[Anum_pg_database_datname - 1] =
416                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
417         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
418         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
419         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
420         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
421         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
422         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
423         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
424         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
425
426         /*
427          * We deliberately set datconfig and datacl to defaults (NULL), rather
428          * than copying them from the template database.  Copying datacl would
429          * be a bad idea when the owner is not the same as the template's
430          * owner. It's more debatable whether datconfig should be copied.
431          */
432         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
433         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
434
435         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
436
437         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
438                                                                                  * selection */
439
440         simple_heap_insert(pg_database_rel, tuple);
441
442         /* Update indexes */
443         CatalogUpdateIndexes(pg_database_rel, tuple);
444
445         /* Close pg_database, but keep lock till commit */
446         heap_close(pg_database_rel, NoLock);
447
448         /*
449          * Force dirty buffers out to disk, so that newly-connecting backends
450          * will see the new database in pg_database right away.  (They'll see
451          * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
452          */
453         BufferSync(-1, -1);
454 }
455
456
457 /*
458  * DROP DATABASE
459  */
460 void
461 dropdb(const char *dbname)
462 {
463         int4            db_owner;
464         bool            db_istemplate;
465         Oid                     db_id;
466         Relation        pgdbrel;
467         SysScanDesc pgdbscan;
468         ScanKeyData key;
469         HeapTuple       tup;
470
471         PreventTransactionChain((void *) dbname, "DROP DATABASE");
472
473         AssertArg(dbname);
474
475         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
476                 ereport(ERROR,
477                                 (errcode(ERRCODE_OBJECT_IN_USE),
478                                  errmsg("cannot drop the currently open database")));
479
480         /*
481          * Obtain exclusive lock on pg_database.  We need this to ensure that
482          * no new backend starts up in the target database while we are
483          * deleting it.  (Actually, a new backend might still manage to start
484          * up, because it will read pg_database without any locking to
485          * discover the database's OID.  But it will detect its error in
486          * ReverifyMyDatabase and shut down before any serious damage is done.
487          * See postinit.c.)
488          */
489         pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
490
491         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
492                                          &db_istemplate, NULL, NULL, NULL, NULL))
493                 ereport(ERROR,
494                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
495                                  errmsg("database \"%s\" does not exist", dbname)));
496
497         if (GetUserId() != db_owner && !superuser())
498                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
499                                            dbname);
500
501         /*
502          * Disallow dropping a DB that is marked istemplate.  This is just to
503          * prevent people from accidentally dropping template0 or template1;
504          * they can do so if they're really determined ...
505          */
506         if (db_istemplate)
507                 ereport(ERROR,
508                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
509                                  errmsg("cannot drop a template database")));
510
511         /*
512          * Check for active backends in the target database.
513          */
514         if (DatabaseHasActiveBackends(db_id, false))
515                 ereport(ERROR,
516                                 (errcode(ERRCODE_OBJECT_IN_USE),
517                            errmsg("database \"%s\" is being accessed by other users",
518                                           dbname)));
519
520         /*
521          * Find the database's tuple by OID (should be unique).
522          */
523         ScanKeyInit(&key,
524                                 ObjectIdAttributeNumber,
525                                 BTEqualStrategyNumber, F_OIDEQ,
526                                 ObjectIdGetDatum(db_id));
527
528         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
529                                                                   SnapshotNow, 1, &key);
530
531         tup = systable_getnext(pgdbscan);
532         if (!HeapTupleIsValid(tup))
533         {
534                 /*
535                  * This error should never come up since the existence of the
536                  * database is checked earlier
537                  */
538                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
539                          dbname);
540         }
541
542         /* Remove the database's tuple from pg_database */
543         simple_heap_delete(pgdbrel, &tup->t_self);
544
545         systable_endscan(pgdbscan);
546
547         /*
548          * Delete any comments associated with the database
549          *
550          * NOTE: this is probably dead code since any such comments should have
551          * been in that database, not mine.
552          */
553         DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
554
555         /*
556          * Close pg_database, but keep exclusive lock till commit to ensure
557          * that any new backend scanning pg_database will see the tuple dead.
558          */
559         heap_close(pgdbrel, NoLock);
560
561         /*
562          * Drop pages for this database that are in the shared buffer cache.
563          * This is important to ensure that no remaining backend tries to
564          * write out a dirty buffer to the dead database later...
565          */
566         DropBuffers(db_id);
567
568         /*
569          * Also, clean out any entries in the shared free space map.
570          */
571         FreeSpaceMapForgetDatabase(db_id);
572
573         /*
574          * Remove all tablespace subdirs belonging to the database.
575          */
576         remove_dbtablespaces(db_id);
577
578         /*
579          * Force dirty buffers out to disk, so that newly-connecting backends
580          * will see the database tuple marked dead in pg_database right away.
581          * (They'll see an uncommitted deletion, but they don't care; see
582          * GetRawDatabaseInfo.)
583          */
584         BufferSync(-1, -1);
585 }
586
587
588 /*
589  * Rename database
590  */
591 void
592 RenameDatabase(const char *oldname, const char *newname)
593 {
594         HeapTuple       tup,
595                                 newtup;
596         Relation        rel;
597         SysScanDesc scan,
598                                 scan2;
599         ScanKeyData key,
600                                 key2;
601
602         /*
603          * Obtain AccessExclusiveLock so that no new session gets started
604          * while the rename is in progress.
605          */
606         rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
607
608         ScanKeyInit(&key,
609                                 Anum_pg_database_datname,
610                                 BTEqualStrategyNumber, F_NAMEEQ,
611                                 NameGetDatum(oldname));
612         scan = systable_beginscan(rel, DatabaseNameIndex, true,
613                                                           SnapshotNow, 1, &key);
614
615         tup = systable_getnext(scan);
616         if (!HeapTupleIsValid(tup))
617                 ereport(ERROR,
618                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
619                                  errmsg("database \"%s\" does not exist", oldname)));
620
621         /*
622          * XXX Client applications probably store the current database
623          * somewhere, so renaming it could cause confusion.  On the other
624          * hand, there may not be an actual problem besides a little
625          * confusion, so think about this and decide.
626          */
627         if (HeapTupleGetOid(tup) == MyDatabaseId)
628                 ereport(ERROR,
629                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
630                                  errmsg("current database may not be renamed")));
631
632         /*
633          * Make sure the database does not have active sessions.  Might not be
634          * necessary, but it's consistent with other database operations.
635          */
636         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
637                 ereport(ERROR,
638                                 (errcode(ERRCODE_OBJECT_IN_USE),
639                            errmsg("database \"%s\" is being accessed by other users",
640                                           oldname)));
641
642         /* make sure the new name doesn't exist */
643         ScanKeyInit(&key2,
644                                 Anum_pg_database_datname,
645                                 BTEqualStrategyNumber, F_NAMEEQ,
646                                 NameGetDatum(newname));
647         scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
648                                                            SnapshotNow, 1, &key2);
649         if (HeapTupleIsValid(systable_getnext(scan2)))
650                 ereport(ERROR,
651                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
652                                  errmsg("database \"%s\" already exists", newname)));
653         systable_endscan(scan2);
654
655         /* must be owner */
656         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
657                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
658                                            oldname);
659
660         /* must have createdb */
661         if (!have_createdb_privilege())
662                 ereport(ERROR,
663                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
664                                  errmsg("permission denied to rename database")));
665
666         /* rename */
667         newtup = heap_copytuple(tup);
668         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
669         simple_heap_update(rel, &newtup->t_self, newtup);
670         CatalogUpdateIndexes(rel, newtup);
671
672         systable_endscan(scan);
673         heap_close(rel, NoLock);
674
675         /*
676          * Force dirty buffers out to disk, so that newly-connecting backends
677          * will see the renamed database in pg_database right away.  (They'll
678          * see an uncommitted tuple, but they don't care; see
679          * GetRawDatabaseInfo.)
680          */
681         BufferSync(-1, -1);
682 }
683
684
685 /*
686  * ALTER DATABASE name SET ...
687  */
688 void
689 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
690 {
691         char       *valuestr;
692         HeapTuple       tuple,
693                                 newtuple;
694         Relation        rel;
695         ScanKeyData scankey;
696         SysScanDesc scan;
697         Datum           repl_val[Natts_pg_database];
698         char            repl_null[Natts_pg_database];
699         char            repl_repl[Natts_pg_database];
700
701         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
702
703         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
704         ScanKeyInit(&scankey,
705                                 Anum_pg_database_datname,
706                                 BTEqualStrategyNumber, F_NAMEEQ,
707                                 NameGetDatum(stmt->dbname));
708         scan = systable_beginscan(rel, DatabaseNameIndex, true,
709                                                           SnapshotNow, 1, &scankey);
710         tuple = systable_getnext(scan);
711         if (!HeapTupleIsValid(tuple))
712                 ereport(ERROR,
713                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
714                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
715
716         if (!(superuser()
717                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
718                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
719                                            stmt->dbname);
720
721         MemSet(repl_repl, ' ', sizeof(repl_repl));
722         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
723
724         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
725         {
726                 /* RESET ALL */
727                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
728                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
729         }
730         else
731         {
732                 Datum           datum;
733                 bool            isnull;
734                 ArrayType  *a;
735
736                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
737
738                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
739                                                          RelationGetDescr(rel), &isnull);
740
741                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
742
743                 if (valuestr)
744                         a = GUCArrayAdd(a, stmt->variable, valuestr);
745                 else
746                         a = GUCArrayDelete(a, stmt->variable);
747
748                 if (a)
749                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
750                 else
751                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
752         }
753
754         newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
755         simple_heap_update(rel, &tuple->t_self, newtuple);
756
757         /* Update indexes */
758         CatalogUpdateIndexes(rel, newtuple);
759
760         systable_endscan(scan);
761         heap_close(rel, NoLock);
762 }
763
764
765 /*
766  * ALTER DATABASE name OWNER TO newowner
767  */
768 void
769 AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
770 {
771         HeapTuple       tuple;
772         Relation        rel;
773         ScanKeyData scankey;
774         SysScanDesc scan;
775         Form_pg_database        datForm;
776
777         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
778         ScanKeyInit(&scankey,
779                                 Anum_pg_database_datname,
780                                 BTEqualStrategyNumber, F_NAMEEQ,
781                                 NameGetDatum(dbname));
782         scan = systable_beginscan(rel, DatabaseNameIndex, true,
783                                                           SnapshotNow, 1, &scankey);
784         tuple = systable_getnext(scan);
785         if (!HeapTupleIsValid(tuple))
786                 ereport(ERROR,
787                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
788                                  errmsg("database \"%s\" does not exist", dbname)));
789
790         datForm = (Form_pg_database) GETSTRUCT(tuple);
791
792         /* 
793          * If the new owner is the same as the existing owner, consider the
794          * command to have succeeded.  This is to be consistent with other objects.
795          */
796         if (datForm->datdba != newOwnerSysId)
797         {
798                 Datum           repl_val[Natts_pg_database];
799                 char            repl_null[Natts_pg_database];
800                 char            repl_repl[Natts_pg_database];
801                 Acl             *newAcl;
802                 Datum           aclDatum;
803                 bool            isNull;
804                 HeapTuple       newtuple;
805
806                 /* changing owner's database for someone else: must be superuser */
807                 /* note that the someone else need not have any permissions */
808                 if (!superuser())
809                         ereport(ERROR,
810                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
811                                          errmsg("must be superuser to change owner")));
812
813                 memset(repl_null, ' ', sizeof(repl_null));
814                 memset(repl_repl, ' ', sizeof(repl_repl));
815
816                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
817                 repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
818
819                 /*
820                  * Determine the modified ACL for the new owner.  This is only
821                  * necessary when the ACL is non-null.
822                  */
823                 aclDatum = heap_getattr(tuple,
824                                                         Anum_pg_database_datacl,
825                                                         RelationGetDescr(rel),
826                                                         &isNull);
827                 if (!isNull)
828                 {
829                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
830                                                                  datForm->datdba, newOwnerSysId);
831                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
832                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
833                 }
834
835                 newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
836                 simple_heap_update(rel, &newtuple->t_self, newtuple);
837                 CatalogUpdateIndexes(rel, newtuple);
838
839                 heap_freetuple(newtuple);
840         }
841
842         systable_endscan(scan);
843         heap_close(rel, NoLock);
844 }
845
846
847 /*
848  * Helper functions
849  */
850
851 static bool
852 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
853                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
854                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
855                         Oid *dbTablespace)
856 {
857         Relation        relation;
858         ScanKeyData scanKey;
859         SysScanDesc scan;
860         HeapTuple       tuple;
861         bool            gottuple;
862
863         AssertArg(name);
864
865         /* Caller may wish to grab a better lock on pg_database beforehand... */
866         relation = heap_openr(DatabaseRelationName, AccessShareLock);
867
868         ScanKeyInit(&scanKey,
869                                 Anum_pg_database_datname,
870                                 BTEqualStrategyNumber, F_NAMEEQ,
871                                 NameGetDatum(name));
872
873         scan = systable_beginscan(relation, DatabaseNameIndex, true,
874                                                           SnapshotNow, 1, &scanKey);
875
876         tuple = systable_getnext(scan);
877
878         gottuple = HeapTupleIsValid(tuple);
879         if (gottuple)
880         {
881                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
882
883                 /* oid of the database */
884                 if (dbIdP)
885                         *dbIdP = HeapTupleGetOid(tuple);
886                 /* sysid of the owner */
887                 if (ownerIdP)
888                         *ownerIdP = dbform->datdba;
889                 /* character encoding */
890                 if (encodingP)
891                         *encodingP = dbform->encoding;
892                 /* allowed as template? */
893                 if (dbIsTemplateP)
894                         *dbIsTemplateP = dbform->datistemplate;
895                 /* last system OID used in database */
896                 if (dbLastSysOidP)
897                         *dbLastSysOidP = dbform->datlastsysoid;
898                 /* limit of vacuumed XIDs */
899                 if (dbVacuumXidP)
900                         *dbVacuumXidP = dbform->datvacuumxid;
901                 /* limit of frozen XIDs */
902                 if (dbFrozenXidP)
903                         *dbFrozenXidP = dbform->datfrozenxid;
904                 /* default tablespace for this database */
905                 if (dbTablespace)
906                         *dbTablespace = dbform->dattablespace;
907         }
908
909         systable_endscan(scan);
910         heap_close(relation, AccessShareLock);
911
912         return gottuple;
913 }
914
915 static bool
916 have_createdb_privilege(void)
917 {
918         HeapTuple       utup;
919         bool            retval;
920
921         utup = SearchSysCache(SHADOWSYSID,
922                                                   Int32GetDatum(GetUserId()),
923                                                   0, 0, 0);
924
925         if (!HeapTupleIsValid(utup))
926                 retval = false;
927         else
928                 retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
929
930         ReleaseSysCache(utup);
931
932         return retval;
933 }
934
935 /*
936  * Remove tablespace directories
937  *
938  * We don't know what tablespaces db_id is using, so iterate through all
939  * tablespaces removing <tablespace>/db_id
940  */
941 static void
942 remove_dbtablespaces(Oid db_id)
943 {
944         Relation rel;
945         HeapScanDesc scan;
946         HeapTuple tuple;
947
948         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
949         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
950         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
951         {
952                 Oid             dsttablespace = HeapTupleGetOid(tuple);
953                 char   *dstpath;
954                 struct stat st;
955
956                 /* Don't mess with the global tablespace */
957                 if (dsttablespace == GLOBALTABLESPACE_OID)
958                         continue;
959
960                 dstpath = GetDatabasePath(db_id, dsttablespace);
961
962                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
963                 {
964                         /* Assume we can ignore it */
965                         pfree(dstpath);
966                         continue;
967                 }
968
969                 if (!rmtree(dstpath, true))
970                 {
971                         ereport(WARNING,
972                                 (errmsg("could not remove database directory \"%s\"",
973                                                 dstpath),
974                                  errhint("Look in the postmaster's stderr log for more information.")));
975                 }
976
977                 pfree(dstpath);
978         }
979
980         heap_endscan(scan);
981         heap_close(rel, AccessShareLock);
982 }
983
984
985 /*
986  * get_database_oid - given a database name, look up the OID
987  *
988  * Returns InvalidOid if database name not found.
989  *
990  * This is not actually used in this file, but is exported for use elsewhere.
991  */
992 Oid
993 get_database_oid(const char *dbname)
994 {
995         Relation        pg_database;
996         ScanKeyData entry[1];
997         SysScanDesc scan;
998         HeapTuple       dbtuple;
999         Oid                     oid;
1000
1001         /* There's no syscache for pg_database, so must look the hard way */
1002         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1003         ScanKeyInit(&entry[0],
1004                                 Anum_pg_database_datname,
1005                                 BTEqualStrategyNumber, F_NAMEEQ,
1006                                 CStringGetDatum(dbname));
1007         scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
1008                                                           SnapshotNow, 1, entry);
1009
1010         dbtuple = systable_getnext(scan);
1011
1012         /* We assume that there can be at most one matching tuple */
1013         if (HeapTupleIsValid(dbtuple))
1014                 oid = HeapTupleGetOid(dbtuple);
1015         else
1016                 oid = InvalidOid;
1017
1018         systable_endscan(scan);
1019         heap_close(pg_database, AccessShareLock);
1020
1021         return oid;
1022 }
1023
1024
1025 /*
1026  * get_database_name - given a database OID, look up the name
1027  *
1028  * Returns a palloc'd string, or NULL if no such database.
1029  *
1030  * This is not actually used in this file, but is exported for use elsewhere.
1031  */
1032 char *
1033 get_database_name(Oid dbid)
1034 {
1035         Relation        pg_database;
1036         ScanKeyData entry[1];
1037         SysScanDesc scan;
1038         HeapTuple       dbtuple;
1039         char       *result;
1040
1041         /* There's no syscache for pg_database, so must look the hard way */
1042         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1043         ScanKeyInit(&entry[0],
1044                                 ObjectIdAttributeNumber,
1045                                 BTEqualStrategyNumber, F_OIDEQ,
1046                                 ObjectIdGetDatum(dbid));
1047         scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
1048                                                           SnapshotNow, 1, entry);
1049
1050         dbtuple = systable_getnext(scan);
1051
1052         /* We assume that there can be at most one matching tuple */
1053         if (HeapTupleIsValid(dbtuple))
1054                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1055         else
1056                 result = NULL;
1057
1058         systable_endscan(scan);
1059         heap_close(pg_database, AccessShareLock);
1060
1061         return result;
1062 }