]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Dept. of second thoughts: it'd be a good idea to flush buffers
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  *
7  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.144 2004/08/30 03:50:24 tgl Exp $
13  *
14  *-------------------------------------------------------------------------
15  */
16 #include "postgres.h"
17
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21
22 #include "access/genam.h"
23 #include "access/heapam.h"
24 #include "catalog/catname.h"
25 #include "catalog/catalog.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_shadow.h"
28 #include "catalog/pg_tablespace.h"
29 #include "catalog/indexing.h"
30 #include "commands/comment.h"
31 #include "commands/dbcommands.h"
32 #include "commands/tablespace.h"
33 #include "mb/pg_wchar.h"
34 #include "miscadmin.h"
35 #include "storage/fd.h"
36 #include "storage/freespace.h"
37 #include "storage/sinval.h"
38 #include "utils/acl.h"
39 #include "utils/array.h"
40 #include "utils/builtins.h"
41 #include "utils/fmgroids.h"
42 #include "utils/guc.h"
43 #include "utils/lsyscache.h"
44 #include "utils/syscache.h"
45
46
47 /* non-export function prototypes */
48 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
49                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
50                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
51                         Oid *dbTablespace);
52 static bool have_createdb_privilege(void);
53 static void remove_dbtablespaces(Oid db_id);
54
55
56 /*
57  * CREATE DATABASE
58  */
59 void
60 createdb(const CreatedbStmt *stmt)
61 {
62         HeapScanDesc scan;
63         Relation        rel;
64         Oid                     src_dboid;
65         AclId           src_owner;
66         int                     src_encoding;
67         bool            src_istemplate;
68         Oid                     src_lastsysoid;
69         TransactionId src_vacuumxid;
70         TransactionId src_frozenxid;
71         Oid                     src_deftablespace;
72         Oid                     dst_deftablespace;
73         Relation        pg_database_rel;
74         HeapTuple       tuple;
75         TupleDesc       pg_database_dsc;
76         Datum           new_record[Natts_pg_database];
77         char            new_record_nulls[Natts_pg_database];
78         Oid                     dboid;
79         AclId           datdba;
80         ListCell   *option;
81         DefElem    *dtablespacename = NULL;
82         DefElem    *downer = NULL;
83         DefElem    *dtemplate = NULL;
84         DefElem    *dencoding = NULL;
85         char       *dbname = stmt->dbname;
86         char       *dbowner = NULL;
87         char       *dbtemplate = NULL;
88         int                     encoding = -1;
89
90 #ifndef WIN32
91         char            buf[2 * MAXPGPATH + 100];
92 #endif
93
94         /* don't call this in a transaction block */
95         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
96
97         /* Extract options from the statement node tree */
98         foreach(option, stmt->options)
99         {
100                 DefElem    *defel = (DefElem *) lfirst(option);
101
102                 if (strcmp(defel->defname, "tablespace") == 0)
103                 {
104                         if (dtablespacename)
105                                 ereport(ERROR,
106                                                 (errcode(ERRCODE_SYNTAX_ERROR),
107                                                  errmsg("conflicting or redundant options")));
108                         dtablespacename = defel;
109                 }
110                 else if (strcmp(defel->defname, "owner") == 0)
111                 {
112                         if (downer)
113                                 ereport(ERROR,
114                                                 (errcode(ERRCODE_SYNTAX_ERROR),
115                                                  errmsg("conflicting or redundant options")));
116                         downer = defel;
117                 }
118                 else if (strcmp(defel->defname, "template") == 0)
119                 {
120                         if (dtemplate)
121                                 ereport(ERROR,
122                                                 (errcode(ERRCODE_SYNTAX_ERROR),
123                                                  errmsg("conflicting or redundant options")));
124                         dtemplate = defel;
125                 }
126                 else if (strcmp(defel->defname, "encoding") == 0)
127                 {
128                         if (dencoding)
129                                 ereport(ERROR,
130                                                 (errcode(ERRCODE_SYNTAX_ERROR),
131                                                  errmsg("conflicting or redundant options")));
132                         dencoding = defel;
133                 }
134                 else if (strcmp(defel->defname, "location") == 0)
135                 {
136                         ereport(WARNING,
137                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
138                                          errmsg("LOCATION is not supported anymore"),
139                                          errhint("Consider using tablespaces instead.")));
140                 }
141                 else
142                         elog(ERROR, "option \"%s\" not recognized",
143                                  defel->defname);
144         }
145
146         if (downer && downer->arg)
147                 dbowner = strVal(downer->arg);
148         if (dtemplate && dtemplate->arg)
149                 dbtemplate = strVal(dtemplate->arg);
150         if (dencoding && dencoding->arg)
151         {
152                 const char *encoding_name;
153
154                 if (IsA(dencoding->arg, Integer))
155                 {
156                         encoding = intVal(dencoding->arg);
157                         encoding_name = pg_encoding_to_char(encoding);
158                         if (strcmp(encoding_name, "") == 0 ||
159                                 pg_valid_server_encoding(encoding_name) < 0)
160                                 ereport(ERROR,
161                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
162                                                  errmsg("%d is not a valid encoding code",
163                                                                 encoding)));
164                 }
165                 else if (IsA(dencoding->arg, String))
166                 {
167                         encoding_name = strVal(dencoding->arg);
168                         if (pg_valid_server_encoding(encoding_name) < 0)
169                                 ereport(ERROR,
170                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
171                                                  errmsg("%s is not a valid encoding name",
172                                                                 encoding_name)));
173                         encoding = pg_char_to_encoding(encoding_name);
174                 }
175                 else
176                         elog(ERROR, "unrecognized node type: %d",
177                                  nodeTag(dencoding->arg));
178         }
179
180         /* obtain sysid of proposed owner */
181         if (dbowner)
182                 datdba = get_usesysid(dbowner); /* will ereport if no such user */
183         else
184                 datdba = GetUserId();
185
186         if (datdba == GetUserId())
187         {
188                 /* creating database for self: can be superuser or createdb */
189                 if (!superuser() && !have_createdb_privilege())
190                         ereport(ERROR,
191                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
192                                          errmsg("permission denied to create database")));
193         }
194         else
195         {
196                 /* creating database for someone else: must be superuser */
197                 /* note that the someone else need not have any permissions */
198                 if (!superuser())
199                         ereport(ERROR,
200                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
201                                          errmsg("must be superuser to create database for another user")));
202         }
203
204         /*
205          * Check for db name conflict.  There is a race condition here, since
206          * another backend could create the same DB name before we commit.
207          * However, holding an exclusive lock on pg_database for the whole
208          * time we are copying the source database doesn't seem like a good
209          * idea, so accept possibility of race to create.  We will check again
210          * after we grab the exclusive lock.
211          */
212         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
213                 ereport(ERROR,
214                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
215                                  errmsg("database \"%s\" already exists", dbname)));
216
217         /*
218          * Lookup database (template) to be cloned.
219          */
220         if (!dbtemplate)
221                 dbtemplate = "template1";               /* Default template database name */
222
223         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
224                                          &src_istemplate, &src_lastsysoid,
225                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
226                 ereport(ERROR,
227                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
228                  errmsg("template database \"%s\" does not exist", dbtemplate)));
229
230         /*
231          * Permission check: to copy a DB that's not marked datistemplate, you
232          * must be superuser or the owner thereof.
233          */
234         if (!src_istemplate)
235         {
236                 if (!superuser() && GetUserId() != src_owner)
237                         ereport(ERROR,
238                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
239                                          errmsg("permission denied to copy database \"%s\"",
240                                                         dbtemplate)));
241         }
242
243         /*
244          * The source DB can't have any active backends, except this one
245          * (exception is to allow CREATE DB while connected to template1).
246          * Otherwise we might copy inconsistent data.  This check is not
247          * bulletproof, since someone might connect while we are copying...
248          */
249         if (DatabaseHasActiveBackends(src_dboid, true))
250                 ereport(ERROR,
251                                 (errcode(ERRCODE_OBJECT_IN_USE),
252                 errmsg("source database \"%s\" is being accessed by other users",
253                            dbtemplate)));
254
255         /* If encoding is defaulted, use source's encoding */
256         if (encoding < 0)
257                 encoding = src_encoding;
258
259         /* Some encodings are client only */
260         if (!PG_VALID_BE_ENCODING(encoding))
261                 ereport(ERROR,
262                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
263                                  errmsg("invalid server encoding %d", encoding)));
264
265         /* Resolve default tablespace for new database */
266         if (dtablespacename && dtablespacename->arg)
267         {
268                 char       *tablespacename;
269                 AclResult       aclresult;
270
271                 tablespacename = strVal(dtablespacename->arg);
272                 dst_deftablespace = get_tablespace_oid(tablespacename);
273                 if (!OidIsValid(dst_deftablespace))
274                         ereport(ERROR,
275                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
276                                          errmsg("tablespace \"%s\" does not exist",
277                                                         tablespacename)));
278                 /* check permissions */
279                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
280                                                                                    ACL_CREATE);
281                 if (aclresult != ACLCHECK_OK)
282                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
283                                                    tablespacename);
284         }
285         else
286         {
287                 /* Use template database's default tablespace */
288                 dst_deftablespace = src_deftablespace;
289                 /* Note there is no additional permission check in this path */
290         }
291
292         /*
293          * Preassign OID for pg_database tuple, so that we can compute db
294          * path.
295          */
296         dboid = newoid();
297
298         /*
299          * Force dirty buffers out to disk, to ensure source database is
300          * up-to-date for the copy.  (We really only need to flush buffers for
301          * the source database...)
302          */
303         BufferSync(-1, -1);
304
305         /*
306          * Close virtual file descriptors so the kernel has more available for
307          * the system() calls below.
308          */
309         closeAllVfds();
310
311         /*
312          * Iterate through all tablespaces of the template database, and copy
313          * each one to the new database.
314          *
315          * If we are trying to change the default tablespace of the template, we
316          * require that the template not have any files in the new default
317          * tablespace.  This avoids the need to merge two subdirectories. This
318          * could probably be improved later.
319          */
320         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
321         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
322         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
323         {
324                 Oid                     srctablespace = HeapTupleGetOid(tuple);
325                 Oid                     dsttablespace;
326                 char       *srcpath;
327                 char       *dstpath;
328                 struct stat st;
329
330                 /* No need to copy global tablespace */
331                 if (srctablespace == GLOBALTABLESPACE_OID)
332                         continue;
333
334                 srcpath = GetDatabasePath(src_dboid, srctablespace);
335
336                 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode))
337                 {
338                         /* Assume we can ignore it */
339                         pfree(srcpath);
340                         continue;
341                 }
342
343                 if (srctablespace == src_deftablespace)
344                         dsttablespace = dst_deftablespace;
345                 else
346                         dsttablespace = srctablespace;
347
348                 dstpath = GetDatabasePath(dboid, dsttablespace);
349
350                 if (stat(dstpath, &st) == 0 || errno != ENOENT)
351                 {
352                         remove_dbtablespaces(dboid);
353                         ereport(ERROR,
354                                         (errmsg("could not initialize database directory"),
355                                 errdetail("Directory \"%s\" already exists.", dstpath)));
356                 }
357
358 #ifndef WIN32
359
360                 /*
361                  * Copy this subdirectory to the new location
362                  *
363                  * XXX use of cp really makes this code pretty grotty, particularly
364                  * with respect to lack of ability to report errors well.  Someday
365                  * rewrite to do it for ourselves.
366                  */
367
368                 /* We might need to use cp -R one day for portability */
369                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
370                                  srcpath, dstpath);
371                 if (system(buf) != 0)
372                 {
373                         remove_dbtablespaces(dboid);
374                         ereport(ERROR,
375                                         (errmsg("could not initialize database directory"),
376                                          errdetail("Failing system command was: %s", buf),
377                                          errhint("Look in the postmaster's stderr log for more information.")));
378                 }
379 #else                                                   /* WIN32 */
380                 if (copydir(srcpath, dstpath) != 0)
381                 {
382                         /* copydir should already have given details of its troubles */
383                         remove_dbtablespaces(dboid);
384                         ereport(ERROR,
385                                         (errmsg("could not initialize database directory")));
386                 }
387 #endif   /* WIN32 */
388
389                 /* Record the filesystem change in XLOG */
390                 {
391                         xl_dbase_create_rec xlrec;
392                         XLogRecData rdata[3];
393
394                         xlrec.db_id = dboid;
395                         rdata[0].buffer = InvalidBuffer;
396                         rdata[0].data = (char *) &xlrec;
397                         rdata[0].len = offsetof(xl_dbase_create_rec, src_path);
398                         rdata[0].next = &(rdata[1]);
399
400                         rdata[1].buffer = InvalidBuffer;
401                         rdata[1].data = (char *) srcpath;
402                         rdata[1].len = strlen(srcpath) + 1;
403                         rdata[1].next = &(rdata[2]);
404
405                         rdata[2].buffer = InvalidBuffer;
406                         rdata[2].data = (char *) dstpath;
407                         rdata[2].len = strlen(dstpath) + 1;
408                         rdata[2].next = NULL;
409
410                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
411                 }
412         }
413         heap_endscan(scan);
414         heap_close(rel, AccessShareLock);
415
416         /*
417          * Now OK to grab exclusive lock on pg_database.
418          */
419         pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
420
421         /* Check to see if someone else created same DB name meanwhile. */
422         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
423         {
424                 /* Don't hold lock while doing recursive remove */
425                 heap_close(pg_database_rel, AccessExclusiveLock);
426                 remove_dbtablespaces(dboid);
427                 ereport(ERROR,
428                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
429                                  errmsg("database \"%s\" already exists", dbname)));
430         }
431
432         /*
433          * Insert a new tuple into pg_database
434          */
435         pg_database_dsc = RelationGetDescr(pg_database_rel);
436
437         /* Form tuple */
438         MemSet(new_record, 0, sizeof(new_record));
439         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
440
441         new_record[Anum_pg_database_datname - 1] =
442                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
443         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
444         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
445         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
446         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
447         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
448         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
449         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
450         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
451
452         /*
453          * We deliberately set datconfig and datacl to defaults (NULL), rather
454          * than copying them from the template database.  Copying datacl would
455          * be a bad idea when the owner is not the same as the template's
456          * owner. It's more debatable whether datconfig should be copied.
457          */
458         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
459         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
460
461         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
462
463         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
464                                                                                  * selection */
465
466         simple_heap_insert(pg_database_rel, tuple);
467
468         /* Update indexes */
469         CatalogUpdateIndexes(pg_database_rel, tuple);
470
471         /* Close pg_database, but keep lock till commit */
472         heap_close(pg_database_rel, NoLock);
473
474         /*
475          * Force dirty buffers out to disk, so that newly-connecting backends
476          * will see the new database in pg_database right away.  (They'll see
477          * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
478          */
479         BufferSync(-1, -1);
480 }
481
482
483 /*
484  * DROP DATABASE
485  */
486 void
487 dropdb(const char *dbname)
488 {
489         int4            db_owner;
490         bool            db_istemplate;
491         Oid                     db_id;
492         Relation        pgdbrel;
493         SysScanDesc pgdbscan;
494         ScanKeyData key;
495         HeapTuple       tup;
496
497         PreventTransactionChain((void *) dbname, "DROP DATABASE");
498
499         AssertArg(dbname);
500
501         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
502                 ereport(ERROR,
503                                 (errcode(ERRCODE_OBJECT_IN_USE),
504                                  errmsg("cannot drop the currently open database")));
505
506         /*
507          * Obtain exclusive lock on pg_database.  We need this to ensure that
508          * no new backend starts up in the target database while we are
509          * deleting it.  (Actually, a new backend might still manage to start
510          * up, because it will read pg_database without any locking to
511          * discover the database's OID.  But it will detect its error in
512          * ReverifyMyDatabase and shut down before any serious damage is done.
513          * See postinit.c.)
514          */
515         pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
516
517         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
518                                          &db_istemplate, NULL, NULL, NULL, NULL))
519                 ereport(ERROR,
520                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
521                                  errmsg("database \"%s\" does not exist", dbname)));
522
523         if (GetUserId() != db_owner && !superuser())
524                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
525                                            dbname);
526
527         /*
528          * Disallow dropping a DB that is marked istemplate.  This is just to
529          * prevent people from accidentally dropping template0 or template1;
530          * they can do so if they're really determined ...
531          */
532         if (db_istemplate)
533                 ereport(ERROR,
534                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
535                                  errmsg("cannot drop a template database")));
536
537         /*
538          * Check for active backends in the target database.
539          */
540         if (DatabaseHasActiveBackends(db_id, false))
541                 ereport(ERROR,
542                                 (errcode(ERRCODE_OBJECT_IN_USE),
543                            errmsg("database \"%s\" is being accessed by other users",
544                                           dbname)));
545
546         /*
547          * Find the database's tuple by OID (should be unique).
548          */
549         ScanKeyInit(&key,
550                                 ObjectIdAttributeNumber,
551                                 BTEqualStrategyNumber, F_OIDEQ,
552                                 ObjectIdGetDatum(db_id));
553
554         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
555                                                                   SnapshotNow, 1, &key);
556
557         tup = systable_getnext(pgdbscan);
558         if (!HeapTupleIsValid(tup))
559         {
560                 /*
561                  * This error should never come up since the existence of the
562                  * database is checked earlier
563                  */
564                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
565                          dbname);
566         }
567
568         /* Remove the database's tuple from pg_database */
569         simple_heap_delete(pgdbrel, &tup->t_self);
570
571         systable_endscan(pgdbscan);
572
573         /*
574          * Delete any comments associated with the database
575          *
576          * NOTE: this is probably dead code since any such comments should have
577          * been in that database, not mine.
578          */
579         DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
580
581         /*
582          * Close pg_database, but keep exclusive lock till commit to ensure
583          * that any new backend scanning pg_database will see the tuple dead.
584          */
585         heap_close(pgdbrel, NoLock);
586
587         /*
588          * Drop pages for this database that are in the shared buffer cache.
589          * This is important to ensure that no remaining backend tries to
590          * write out a dirty buffer to the dead database later...
591          */
592         DropBuffers(db_id);
593
594         /*
595          * Also, clean out any entries in the shared free space map.
596          */
597         FreeSpaceMapForgetDatabase(db_id);
598
599         /*
600          * Remove all tablespace subdirs belonging to the database.
601          */
602         remove_dbtablespaces(db_id);
603
604         /*
605          * Force dirty buffers out to disk, so that newly-connecting backends
606          * will see the database tuple marked dead in pg_database right away.
607          * (They'll see an uncommitted deletion, but they don't care; see
608          * GetRawDatabaseInfo.)
609          */
610         BufferSync(-1, -1);
611 }
612
613
614 /*
615  * Rename database
616  */
617 void
618 RenameDatabase(const char *oldname, const char *newname)
619 {
620         HeapTuple       tup,
621                                 newtup;
622         Relation        rel;
623         SysScanDesc scan,
624                                 scan2;
625         ScanKeyData key,
626                                 key2;
627
628         /*
629          * Obtain AccessExclusiveLock so that no new session gets started
630          * while the rename is in progress.
631          */
632         rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
633
634         ScanKeyInit(&key,
635                                 Anum_pg_database_datname,
636                                 BTEqualStrategyNumber, F_NAMEEQ,
637                                 NameGetDatum(oldname));
638         scan = systable_beginscan(rel, DatabaseNameIndex, true,
639                                                           SnapshotNow, 1, &key);
640
641         tup = systable_getnext(scan);
642         if (!HeapTupleIsValid(tup))
643                 ereport(ERROR,
644                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
645                                  errmsg("database \"%s\" does not exist", oldname)));
646
647         /*
648          * XXX Client applications probably store the current database
649          * somewhere, so renaming it could cause confusion.  On the other
650          * hand, there may not be an actual problem besides a little
651          * confusion, so think about this and decide.
652          */
653         if (HeapTupleGetOid(tup) == MyDatabaseId)
654                 ereport(ERROR,
655                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
656                                  errmsg("current database may not be renamed")));
657
658         /*
659          * Make sure the database does not have active sessions.  Might not be
660          * necessary, but it's consistent with other database operations.
661          */
662         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
663                 ereport(ERROR,
664                                 (errcode(ERRCODE_OBJECT_IN_USE),
665                            errmsg("database \"%s\" is being accessed by other users",
666                                           oldname)));
667
668         /* make sure the new name doesn't exist */
669         ScanKeyInit(&key2,
670                                 Anum_pg_database_datname,
671                                 BTEqualStrategyNumber, F_NAMEEQ,
672                                 NameGetDatum(newname));
673         scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
674                                                            SnapshotNow, 1, &key2);
675         if (HeapTupleIsValid(systable_getnext(scan2)))
676                 ereport(ERROR,
677                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
678                                  errmsg("database \"%s\" already exists", newname)));
679         systable_endscan(scan2);
680
681         /* must be owner */
682         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
683                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
684                                            oldname);
685
686         /* must have createdb */
687         if (!have_createdb_privilege())
688                 ereport(ERROR,
689                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
690                                  errmsg("permission denied to rename database")));
691
692         /* rename */
693         newtup = heap_copytuple(tup);
694         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
695         simple_heap_update(rel, &newtup->t_self, newtup);
696         CatalogUpdateIndexes(rel, newtup);
697
698         systable_endscan(scan);
699         heap_close(rel, NoLock);
700
701         /*
702          * Force dirty buffers out to disk, so that newly-connecting backends
703          * will see the renamed database in pg_database right away.  (They'll
704          * see an uncommitted tuple, but they don't care; see
705          * GetRawDatabaseInfo.)
706          */
707         BufferSync(-1, -1);
708 }
709
710
711 /*
712  * ALTER DATABASE name SET ...
713  */
714 void
715 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
716 {
717         char       *valuestr;
718         HeapTuple       tuple,
719                                 newtuple;
720         Relation        rel;
721         ScanKeyData scankey;
722         SysScanDesc scan;
723         Datum           repl_val[Natts_pg_database];
724         char            repl_null[Natts_pg_database];
725         char            repl_repl[Natts_pg_database];
726
727         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
728
729         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
730         ScanKeyInit(&scankey,
731                                 Anum_pg_database_datname,
732                                 BTEqualStrategyNumber, F_NAMEEQ,
733                                 NameGetDatum(stmt->dbname));
734         scan = systable_beginscan(rel, DatabaseNameIndex, true,
735                                                           SnapshotNow, 1, &scankey);
736         tuple = systable_getnext(scan);
737         if (!HeapTupleIsValid(tuple))
738                 ereport(ERROR,
739                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
740                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
741
742         if (!(superuser()
743                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
744                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
745                                            stmt->dbname);
746
747         MemSet(repl_repl, ' ', sizeof(repl_repl));
748         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
749
750         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
751         {
752                 /* RESET ALL */
753                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
754                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
755         }
756         else
757         {
758                 Datum           datum;
759                 bool            isnull;
760                 ArrayType  *a;
761
762                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
763
764                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
765                                                          RelationGetDescr(rel), &isnull);
766
767                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
768
769                 if (valuestr)
770                         a = GUCArrayAdd(a, stmt->variable, valuestr);
771                 else
772                         a = GUCArrayDelete(a, stmt->variable);
773
774                 if (a)
775                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
776                 else
777                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
778         }
779
780         newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
781         simple_heap_update(rel, &tuple->t_self, newtuple);
782
783         /* Update indexes */
784         CatalogUpdateIndexes(rel, newtuple);
785
786         systable_endscan(scan);
787         heap_close(rel, NoLock);
788 }
789
790
791 /*
792  * ALTER DATABASE name OWNER TO newowner
793  */
794 void
795 AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
796 {
797         HeapTuple       tuple;
798         Relation        rel;
799         ScanKeyData scankey;
800         SysScanDesc scan;
801         Form_pg_database datForm;
802
803         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
804         ScanKeyInit(&scankey,
805                                 Anum_pg_database_datname,
806                                 BTEqualStrategyNumber, F_NAMEEQ,
807                                 NameGetDatum(dbname));
808         scan = systable_beginscan(rel, DatabaseNameIndex, true,
809                                                           SnapshotNow, 1, &scankey);
810         tuple = systable_getnext(scan);
811         if (!HeapTupleIsValid(tuple))
812                 ereport(ERROR,
813                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
814                                  errmsg("database \"%s\" does not exist", dbname)));
815
816         datForm = (Form_pg_database) GETSTRUCT(tuple);
817
818         /*
819          * If the new owner is the same as the existing owner, consider the
820          * command to have succeeded.  This is to be consistent with other
821          * objects.
822          */
823         if (datForm->datdba != newOwnerSysId)
824         {
825                 Datum           repl_val[Natts_pg_database];
826                 char            repl_null[Natts_pg_database];
827                 char            repl_repl[Natts_pg_database];
828                 Acl                *newAcl;
829                 Datum           aclDatum;
830                 bool            isNull;
831                 HeapTuple       newtuple;
832
833                 /* changing owner's database for someone else: must be superuser */
834                 /* note that the someone else need not have any permissions */
835                 if (!superuser())
836                         ereport(ERROR,
837                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
838                                          errmsg("must be superuser to change owner")));
839
840                 memset(repl_null, ' ', sizeof(repl_null));
841                 memset(repl_repl, ' ', sizeof(repl_repl));
842
843                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
844                 repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
845
846                 /*
847                  * Determine the modified ACL for the new owner.  This is only
848                  * necessary when the ACL is non-null.
849                  */
850                 aclDatum = heap_getattr(tuple,
851                                                                 Anum_pg_database_datacl,
852                                                                 RelationGetDescr(rel),
853                                                                 &isNull);
854                 if (!isNull)
855                 {
856                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
857                                                                  datForm->datdba, newOwnerSysId);
858                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
859                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
860                 }
861
862                 newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
863                 simple_heap_update(rel, &newtuple->t_self, newtuple);
864                 CatalogUpdateIndexes(rel, newtuple);
865
866                 heap_freetuple(newtuple);
867         }
868
869         systable_endscan(scan);
870         heap_close(rel, NoLock);
871 }
872
873
874 /*
875  * Helper functions
876  */
877
878 static bool
879 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
880                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
881                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
882                         Oid *dbTablespace)
883 {
884         Relation        relation;
885         ScanKeyData scanKey;
886         SysScanDesc scan;
887         HeapTuple       tuple;
888         bool            gottuple;
889
890         AssertArg(name);
891
892         /* Caller may wish to grab a better lock on pg_database beforehand... */
893         relation = heap_openr(DatabaseRelationName, AccessShareLock);
894
895         ScanKeyInit(&scanKey,
896                                 Anum_pg_database_datname,
897                                 BTEqualStrategyNumber, F_NAMEEQ,
898                                 NameGetDatum(name));
899
900         scan = systable_beginscan(relation, DatabaseNameIndex, true,
901                                                           SnapshotNow, 1, &scanKey);
902
903         tuple = systable_getnext(scan);
904
905         gottuple = HeapTupleIsValid(tuple);
906         if (gottuple)
907         {
908                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
909
910                 /* oid of the database */
911                 if (dbIdP)
912                         *dbIdP = HeapTupleGetOid(tuple);
913                 /* sysid of the owner */
914                 if (ownerIdP)
915                         *ownerIdP = dbform->datdba;
916                 /* character encoding */
917                 if (encodingP)
918                         *encodingP = dbform->encoding;
919                 /* allowed as template? */
920                 if (dbIsTemplateP)
921                         *dbIsTemplateP = dbform->datistemplate;
922                 /* last system OID used in database */
923                 if (dbLastSysOidP)
924                         *dbLastSysOidP = dbform->datlastsysoid;
925                 /* limit of vacuumed XIDs */
926                 if (dbVacuumXidP)
927                         *dbVacuumXidP = dbform->datvacuumxid;
928                 /* limit of frozen XIDs */
929                 if (dbFrozenXidP)
930                         *dbFrozenXidP = dbform->datfrozenxid;
931                 /* default tablespace for this database */
932                 if (dbTablespace)
933                         *dbTablespace = dbform->dattablespace;
934         }
935
936         systable_endscan(scan);
937         heap_close(relation, AccessShareLock);
938
939         return gottuple;
940 }
941
942 static bool
943 have_createdb_privilege(void)
944 {
945         HeapTuple       utup;
946         bool            retval;
947
948         utup = SearchSysCache(SHADOWSYSID,
949                                                   Int32GetDatum(GetUserId()),
950                                                   0, 0, 0);
951
952         if (!HeapTupleIsValid(utup))
953                 retval = false;
954         else
955                 retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
956
957         ReleaseSysCache(utup);
958
959         return retval;
960 }
961
962 /*
963  * Remove tablespace directories
964  *
965  * We don't know what tablespaces db_id is using, so iterate through all
966  * tablespaces removing <tablespace>/db_id
967  */
968 static void
969 remove_dbtablespaces(Oid db_id)
970 {
971         Relation        rel;
972         HeapScanDesc scan;
973         HeapTuple       tuple;
974
975         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
976         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
977         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
978         {
979                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
980                 char       *dstpath;
981                 struct stat st;
982
983                 /* Don't mess with the global tablespace */
984                 if (dsttablespace == GLOBALTABLESPACE_OID)
985                         continue;
986
987                 dstpath = GetDatabasePath(db_id, dsttablespace);
988
989                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
990                 {
991                         /* Assume we can ignore it */
992                         pfree(dstpath);
993                         continue;
994                 }
995
996                 if (!rmtree(dstpath, true))
997                         ereport(WARNING,
998                                         (errmsg("could not remove database directory \"%s\"",
999                                                         dstpath)));
1000
1001                 /* Record the filesystem change in XLOG */
1002                 {
1003                         xl_dbase_drop_rec xlrec;
1004                         XLogRecData rdata[2];
1005
1006                         xlrec.db_id = db_id;
1007                         rdata[0].buffer = InvalidBuffer;
1008                         rdata[0].data = (char *) &xlrec;
1009                         rdata[0].len = offsetof(xl_dbase_drop_rec, dir_path);
1010                         rdata[0].next = &(rdata[1]);
1011
1012                         rdata[1].buffer = InvalidBuffer;
1013                         rdata[1].data = (char *) dstpath;
1014                         rdata[1].len = strlen(dstpath) + 1;
1015                         rdata[1].next = NULL;
1016
1017                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1018                 }
1019
1020                 pfree(dstpath);
1021         }
1022
1023         heap_endscan(scan);
1024         heap_close(rel, AccessShareLock);
1025 }
1026
1027
1028 /*
1029  * get_database_oid - given a database name, look up the OID
1030  *
1031  * Returns InvalidOid if database name not found.
1032  *
1033  * This is not actually used in this file, but is exported for use elsewhere.
1034  */
1035 Oid
1036 get_database_oid(const char *dbname)
1037 {
1038         Relation        pg_database;
1039         ScanKeyData entry[1];
1040         SysScanDesc scan;
1041         HeapTuple       dbtuple;
1042         Oid                     oid;
1043
1044         /* There's no syscache for pg_database, so must look the hard way */
1045         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1046         ScanKeyInit(&entry[0],
1047                                 Anum_pg_database_datname,
1048                                 BTEqualStrategyNumber, F_NAMEEQ,
1049                                 CStringGetDatum(dbname));
1050         scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
1051                                                           SnapshotNow, 1, entry);
1052
1053         dbtuple = systable_getnext(scan);
1054
1055         /* We assume that there can be at most one matching tuple */
1056         if (HeapTupleIsValid(dbtuple))
1057                 oid = HeapTupleGetOid(dbtuple);
1058         else
1059                 oid = InvalidOid;
1060
1061         systable_endscan(scan);
1062         heap_close(pg_database, AccessShareLock);
1063
1064         return oid;
1065 }
1066
1067
1068 /*
1069  * get_database_name - given a database OID, look up the name
1070  *
1071  * Returns a palloc'd string, or NULL if no such database.
1072  *
1073  * This is not actually used in this file, but is exported for use elsewhere.
1074  */
1075 char *
1076 get_database_name(Oid dbid)
1077 {
1078         Relation        pg_database;
1079         ScanKeyData entry[1];
1080         SysScanDesc scan;
1081         HeapTuple       dbtuple;
1082         char       *result;
1083
1084         /* There's no syscache for pg_database, so must look the hard way */
1085         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1086         ScanKeyInit(&entry[0],
1087                                 ObjectIdAttributeNumber,
1088                                 BTEqualStrategyNumber, F_OIDEQ,
1089                                 ObjectIdGetDatum(dbid));
1090         scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
1091                                                           SnapshotNow, 1, entry);
1092
1093         dbtuple = systable_getnext(scan);
1094
1095         /* We assume that there can be at most one matching tuple */
1096         if (HeapTupleIsValid(dbtuple))
1097                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1098         else
1099                 result = NULL;
1100
1101         systable_endscan(scan);
1102         heap_close(pg_database, AccessShareLock);
1103
1104         return result;
1105 }
1106
1107 /*
1108  * DATABASE resource manager's routines
1109  */
1110 void
1111 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1112 {
1113         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1114
1115         if (info == XLOG_DBASE_CREATE)
1116         {
1117                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1118                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1119                 struct stat st;
1120
1121 #ifndef WIN32
1122                 char            buf[2 * MAXPGPATH + 100];
1123 #endif
1124
1125                 /*
1126                  * Our theory for replaying a CREATE is to forcibly drop the
1127                  * target subdirectory if present, then re-copy the source data.
1128                  * This may be more work than needed, but it is simple to
1129                  * implement.
1130                  */
1131                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1132                 {
1133                         if (!rmtree(dst_path, true))
1134                                 ereport(WARNING,
1135                                         (errmsg("could not remove database directory \"%s\"",
1136                                                         dst_path)));
1137                 }
1138
1139                 /*
1140                  * Force dirty buffers out to disk, to ensure source database is
1141                  * up-to-date for the copy.  (We really only need to flush buffers for
1142                  * the source database...)
1143                  */
1144                 BufferSync(-1, -1);
1145
1146 #ifndef WIN32
1147
1148                 /*
1149                  * Copy this subdirectory to the new location
1150                  *
1151                  * XXX use of cp really makes this code pretty grotty, particularly
1152                  * with respect to lack of ability to report errors well.  Someday
1153                  * rewrite to do it for ourselves.
1154                  */
1155
1156                 /* We might need to use cp -R one day for portability */
1157                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1158                                  xlrec->src_path, dst_path);
1159                 if (system(buf) != 0)
1160                         ereport(ERROR,
1161                                         (errmsg("could not initialize database directory"),
1162                                          errdetail("Failing system command was: %s", buf),
1163                                          errhint("Look in the postmaster's stderr log for more information.")));
1164 #else                                                   /* WIN32 */
1165                 if (copydir(xlrec->src_path, dst_path) != 0)
1166                 {
1167                         /* copydir should already have given details of its troubles */
1168                         ereport(ERROR,
1169                                         (errmsg("could not initialize database directory")));
1170                 }
1171 #endif   /* WIN32 */
1172         }
1173         else if (info == XLOG_DBASE_DROP)
1174         {
1175                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1176
1177                 /*
1178                  * Drop pages for this database that are in the shared buffer
1179                  * cache
1180                  */
1181                 DropBuffers(xlrec->db_id);
1182
1183                 if (!rmtree(xlrec->dir_path, true))
1184                         ereport(WARNING,
1185                                         (errmsg("could not remove database directory \"%s\"",
1186                                                         xlrec->dir_path)));
1187         }
1188         else
1189                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1190 }
1191
1192 void
1193 dbase_undo(XLogRecPtr lsn, XLogRecord *record)
1194 {
1195         elog(PANIC, "dbase_undo: unimplemented");
1196 }
1197
1198 void
1199 dbase_desc(char *buf, uint8 xl_info, char *rec)
1200 {
1201         uint8           info = xl_info & ~XLR_INFO_MASK;
1202
1203         if (info == XLOG_DBASE_CREATE)
1204         {
1205                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1206                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1207
1208                 sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
1209                                 xlrec->db_id, xlrec->src_path, dst_path);
1210         }
1211         else if (info == XLOG_DBASE_DROP)
1212         {
1213                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1214
1215                 sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
1216                                 xlrec->db_id, xlrec->dir_path);
1217         }
1218         else
1219                 strcat(buf, "UNKNOWN");
1220 }