]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Fix ALTER DATABASE RENAME to allow the operation if user is a superuser
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands take ExclusiveLock on
7  * pg_database to ensure that no two proceed in parallel.  We must use
8  * at least this level of locking to ensure that no two backends try to
9  * write the flat-file copy of pg_database at once.  We avoid using
10  * AccessExclusiveLock since there's no need to lock out ordinary readers
11  * of pg_database.
12  *
13  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.153 2005/03/12 21:11:50 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 #include "access/genam.h"
29 #include "access/heapam.h"
30 #include "catalog/catname.h"
31 #include "catalog/catalog.h"
32 #include "catalog/pg_database.h"
33 #include "catalog/pg_shadow.h"
34 #include "catalog/pg_tablespace.h"
35 #include "catalog/indexing.h"
36 #include "commands/comment.h"
37 #include "commands/dbcommands.h"
38 #include "commands/tablespace.h"
39 #include "mb/pg_wchar.h"
40 #include "miscadmin.h"
41 #include "postmaster/bgwriter.h"
42 #include "storage/fd.h"
43 #include "storage/freespace.h"
44 #include "storage/sinval.h"
45 #include "utils/acl.h"
46 #include "utils/array.h"
47 #include "utils/builtins.h"
48 #include "utils/flatfiles.h"
49 #include "utils/fmgroids.h"
50 #include "utils/guc.h"
51 #include "utils/lsyscache.h"
52 #include "utils/syscache.h"
53
54
55 /* non-export function prototypes */
56 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
57                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
58                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
59                         Oid *dbTablespace);
60 static bool have_createdb_privilege(void);
61 static void remove_dbtablespaces(Oid db_id);
62
63
64 /*
65  * CREATE DATABASE
66  */
67 void
68 createdb(const CreatedbStmt *stmt)
69 {
70         HeapScanDesc scan;
71         Relation        rel;
72         Oid                     src_dboid;
73         AclId           src_owner;
74         int                     src_encoding;
75         bool            src_istemplate;
76         Oid                     src_lastsysoid;
77         TransactionId src_vacuumxid;
78         TransactionId src_frozenxid;
79         Oid                     src_deftablespace;
80         Oid                     dst_deftablespace;
81         Relation        pg_database_rel;
82         HeapTuple       tuple;
83         TupleDesc       pg_database_dsc;
84         Datum           new_record[Natts_pg_database];
85         char            new_record_nulls[Natts_pg_database];
86         Oid                     dboid;
87         AclId           datdba;
88         ListCell   *option;
89         DefElem    *dtablespacename = NULL;
90         DefElem    *downer = NULL;
91         DefElem    *dtemplate = NULL;
92         DefElem    *dencoding = NULL;
93         char       *dbname = stmt->dbname;
94         char       *dbowner = NULL;
95         char       *dbtemplate = NULL;
96         int                     encoding = -1;
97
98 #ifndef WIN32
99         char            buf[2 * MAXPGPATH + 100];
100 #endif
101
102         /* don't call this in a transaction block */
103         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
104
105         /* Extract options from the statement node tree */
106         foreach(option, stmt->options)
107         {
108                 DefElem    *defel = (DefElem *) lfirst(option);
109
110                 if (strcmp(defel->defname, "tablespace") == 0)
111                 {
112                         if (dtablespacename)
113                                 ereport(ERROR,
114                                                 (errcode(ERRCODE_SYNTAX_ERROR),
115                                                  errmsg("conflicting or redundant options")));
116                         dtablespacename = defel;
117                 }
118                 else if (strcmp(defel->defname, "owner") == 0)
119                 {
120                         if (downer)
121                                 ereport(ERROR,
122                                                 (errcode(ERRCODE_SYNTAX_ERROR),
123                                                  errmsg("conflicting or redundant options")));
124                         downer = defel;
125                 }
126                 else if (strcmp(defel->defname, "template") == 0)
127                 {
128                         if (dtemplate)
129                                 ereport(ERROR,
130                                                 (errcode(ERRCODE_SYNTAX_ERROR),
131                                                  errmsg("conflicting or redundant options")));
132                         dtemplate = defel;
133                 }
134                 else if (strcmp(defel->defname, "encoding") == 0)
135                 {
136                         if (dencoding)
137                                 ereport(ERROR,
138                                                 (errcode(ERRCODE_SYNTAX_ERROR),
139                                                  errmsg("conflicting or redundant options")));
140                         dencoding = defel;
141                 }
142                 else if (strcmp(defel->defname, "location") == 0)
143                 {
144                         ereport(WARNING,
145                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
146                                          errmsg("LOCATION is not supported anymore"),
147                                          errhint("Consider using tablespaces instead.")));
148                 }
149                 else
150                         elog(ERROR, "option \"%s\" not recognized",
151                                  defel->defname);
152         }
153
154         if (downer && downer->arg)
155                 dbowner = strVal(downer->arg);
156         if (dtemplate && dtemplate->arg)
157                 dbtemplate = strVal(dtemplate->arg);
158         if (dencoding && dencoding->arg)
159         {
160                 const char *encoding_name;
161
162                 if (IsA(dencoding->arg, Integer))
163                 {
164                         encoding = intVal(dencoding->arg);
165                         encoding_name = pg_encoding_to_char(encoding);
166                         if (strcmp(encoding_name, "") == 0 ||
167                                 pg_valid_server_encoding(encoding_name) < 0)
168                                 ereport(ERROR,
169                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
170                                                  errmsg("%d is not a valid encoding code",
171                                                                 encoding)));
172                 }
173                 else if (IsA(dencoding->arg, String))
174                 {
175                         encoding_name = strVal(dencoding->arg);
176                         if (pg_valid_server_encoding(encoding_name) < 0)
177                                 ereport(ERROR,
178                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
179                                                  errmsg("%s is not a valid encoding name",
180                                                                 encoding_name)));
181                         encoding = pg_char_to_encoding(encoding_name);
182                 }
183                 else
184                         elog(ERROR, "unrecognized node type: %d",
185                                  nodeTag(dencoding->arg));
186         }
187
188         /* obtain sysid of proposed owner */
189         if (dbowner)
190                 datdba = get_usesysid(dbowner); /* will ereport if no such user */
191         else
192                 datdba = GetUserId();
193
194         if (datdba == GetUserId())
195         {
196                 /* creating database for self: can be superuser or createdb */
197                 if (!superuser() && !have_createdb_privilege())
198                         ereport(ERROR,
199                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
200                                          errmsg("permission denied to create database")));
201         }
202         else
203         {
204                 /* creating database for someone else: must be superuser */
205                 /* note that the someone else need not have any permissions */
206                 if (!superuser())
207                         ereport(ERROR,
208                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
209                                          errmsg("must be superuser to create database for another user")));
210         }
211
212         /*
213          * Check for db name conflict.  There is a race condition here, since
214          * another backend could create the same DB name before we commit.
215          * However, holding an exclusive lock on pg_database for the whole
216          * time we are copying the source database doesn't seem like a good
217          * idea, so accept possibility of race to create.  We will check again
218          * after we grab the exclusive lock.
219          */
220         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
221                 ereport(ERROR,
222                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
223                                  errmsg("database \"%s\" already exists", dbname)));
224
225         /*
226          * Lookup database (template) to be cloned.
227          */
228         if (!dbtemplate)
229                 dbtemplate = "template1";               /* Default template database name */
230
231         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
232                                          &src_istemplate, &src_lastsysoid,
233                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
234                 ereport(ERROR,
235                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
236                  errmsg("template database \"%s\" does not exist", dbtemplate)));
237
238         /*
239          * Permission check: to copy a DB that's not marked datistemplate, you
240          * must be superuser or the owner thereof.
241          */
242         if (!src_istemplate)
243         {
244                 if (!superuser() && GetUserId() != src_owner)
245                         ereport(ERROR,
246                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
247                                          errmsg("permission denied to copy database \"%s\"",
248                                                         dbtemplate)));
249         }
250
251         /*
252          * The source DB can't have any active backends, except this one
253          * (exception is to allow CREATE DB while connected to template1).
254          * Otherwise we might copy inconsistent data.  This check is not
255          * bulletproof, since someone might connect while we are copying...
256          */
257         if (DatabaseHasActiveBackends(src_dboid, true))
258                 ereport(ERROR,
259                                 (errcode(ERRCODE_OBJECT_IN_USE),
260                 errmsg("source database \"%s\" is being accessed by other users",
261                            dbtemplate)));
262
263         /* If encoding is defaulted, use source's encoding */
264         if (encoding < 0)
265                 encoding = src_encoding;
266
267         /* Some encodings are client only */
268         if (!PG_VALID_BE_ENCODING(encoding))
269                 ereport(ERROR,
270                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
271                                  errmsg("invalid server encoding %d", encoding)));
272
273         /* Resolve default tablespace for new database */
274         if (dtablespacename && dtablespacename->arg)
275         {
276                 char       *tablespacename;
277                 AclResult       aclresult;
278
279                 tablespacename = strVal(dtablespacename->arg);
280                 dst_deftablespace = get_tablespace_oid(tablespacename);
281                 if (!OidIsValid(dst_deftablespace))
282                         ereport(ERROR,
283                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
284                                          errmsg("tablespace \"%s\" does not exist",
285                                                         tablespacename)));
286                 /* check permissions */
287                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
288                                                                                    ACL_CREATE);
289                 if (aclresult != ACLCHECK_OK)
290                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
291                                                    tablespacename);
292
293                 /*
294                  * If we are trying to change the default tablespace of the template,
295                  * we require that the template not have any files in the new default
296                  * tablespace.  This is necessary because otherwise the copied
297                  * database would contain pg_class rows that refer to its default
298                  * tablespace both explicitly (by OID) and implicitly (as zero), which
299                  * would cause problems.  For example another CREATE DATABASE using
300                  * the copied database as template, and trying to change its default
301                  * tablespace again, would yield outright incorrect results (it would
302                  * improperly move tables to the new default tablespace that should
303                  * stay in the same tablespace).
304                  */
305                 if (dst_deftablespace != src_deftablespace)
306                 {
307                         char       *srcpath;
308                         struct stat st;
309
310                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
311
312                         if (stat(srcpath, &st) == 0 &&
313                                 S_ISDIR(st.st_mode) &&
314                                 !directory_is_empty(srcpath))
315                                 ereport(ERROR,
316                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
317                                                  errmsg("cannot assign new default tablespace \"%s\"",
318                                                                 tablespacename),
319                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
320                                                                    dbtemplate)));
321                         pfree(srcpath);
322                 }
323         }
324         else
325         {
326                 /* Use template database's default tablespace */
327                 dst_deftablespace = src_deftablespace;
328                 /* Note there is no additional permission check in this path */
329         }
330
331         /*
332          * Preassign OID for pg_database tuple, so that we can compute db
333          * path.
334          */
335         dboid = newoid();
336
337         /*
338          * Force dirty buffers out to disk, to ensure source database is
339          * up-to-date for the copy.  (We really only need to flush buffers for
340          * the source database, but bufmgr.c provides no API for that.)
341          */
342         BufferSync();
343
344         /*
345          * Close virtual file descriptors so the kernel has more available for
346          * the system() calls below.
347          */
348         closeAllVfds();
349
350         /*
351          * Iterate through all tablespaces of the template database, and copy
352          * each one to the new database.
353          */
354         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
355         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
356         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
357         {
358                 Oid                     srctablespace = HeapTupleGetOid(tuple);
359                 Oid                     dsttablespace;
360                 char       *srcpath;
361                 char       *dstpath;
362                 struct stat st;
363
364                 /* No need to copy global tablespace */
365                 if (srctablespace == GLOBALTABLESPACE_OID)
366                         continue;
367
368                 srcpath = GetDatabasePath(src_dboid, srctablespace);
369
370                 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
371                         directory_is_empty(srcpath))
372                 {
373                         /* Assume we can ignore it */
374                         pfree(srcpath);
375                         continue;
376                 }
377
378                 if (srctablespace == src_deftablespace)
379                         dsttablespace = dst_deftablespace;
380                 else
381                         dsttablespace = srctablespace;
382
383                 dstpath = GetDatabasePath(dboid, dsttablespace);
384
385                 if (stat(dstpath, &st) == 0 || errno != ENOENT)
386                 {
387                         remove_dbtablespaces(dboid);
388                         ereport(ERROR,
389                                         (errmsg("could not initialize database directory"),
390                                          errdetail("Directory \"%s\" already exists.",
391                                                            dstpath)));
392                 }
393
394 #ifndef WIN32
395
396                 /*
397                  * Copy this subdirectory to the new location
398                  *
399                  * XXX use of cp really makes this code pretty grotty, particularly
400                  * with respect to lack of ability to report errors well.  Someday
401                  * rewrite to do it for ourselves.
402                  */
403
404                 /* We might need to use cp -R one day for portability */
405                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
406                                  srcpath, dstpath);
407                 if (system(buf) != 0)
408                 {
409                         remove_dbtablespaces(dboid);
410                         ereport(ERROR,
411                                         (errmsg("could not initialize database directory"),
412                                          errdetail("Failing system command was: %s", buf),
413                                          errhint("Look in the postmaster's stderr log for more information.")));
414                 }
415 #else                                                   /* WIN32 */
416                 if (copydir(srcpath, dstpath) != 0)
417                 {
418                         /* copydir should already have given details of its troubles */
419                         remove_dbtablespaces(dboid);
420                         ereport(ERROR,
421                                         (errmsg("could not initialize database directory")));
422                 }
423 #endif   /* WIN32 */
424
425                 /* Record the filesystem change in XLOG */
426                 {
427                         xl_dbase_create_rec xlrec;
428                         XLogRecData rdata[3];
429
430                         xlrec.db_id = dboid;
431                         rdata[0].buffer = InvalidBuffer;
432                         rdata[0].data = (char *) &xlrec;
433                         rdata[0].len = offsetof(xl_dbase_create_rec, src_path);
434                         rdata[0].next = &(rdata[1]);
435
436                         rdata[1].buffer = InvalidBuffer;
437                         rdata[1].data = (char *) srcpath;
438                         rdata[1].len = strlen(srcpath) + 1;
439                         rdata[1].next = &(rdata[2]);
440
441                         rdata[2].buffer = InvalidBuffer;
442                         rdata[2].data = (char *) dstpath;
443                         rdata[2].len = strlen(dstpath) + 1;
444                         rdata[2].next = NULL;
445
446                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
447                 }
448         }
449         heap_endscan(scan);
450         heap_close(rel, AccessShareLock);
451
452         /*
453          * Now OK to grab exclusive lock on pg_database.
454          */
455         pg_database_rel = heap_openr(DatabaseRelationName, ExclusiveLock);
456
457         /* Check to see if someone else created same DB name meanwhile. */
458         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
459         {
460                 /* Don't hold lock while doing recursive remove */
461                 heap_close(pg_database_rel, ExclusiveLock);
462                 remove_dbtablespaces(dboid);
463                 ereport(ERROR,
464                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
465                                  errmsg("database \"%s\" already exists", dbname)));
466         }
467
468         /*
469          * Insert a new tuple into pg_database
470          */
471         pg_database_dsc = RelationGetDescr(pg_database_rel);
472
473         /* Form tuple */
474         MemSet(new_record, 0, sizeof(new_record));
475         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
476
477         new_record[Anum_pg_database_datname - 1] =
478                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
479         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
480         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
481         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
482         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
483         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
484         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
485         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
486         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
487
488         /*
489          * We deliberately set datconfig and datacl to defaults (NULL), rather
490          * than copying them from the template database.  Copying datacl would
491          * be a bad idea when the owner is not the same as the template's
492          * owner. It's more debatable whether datconfig should be copied.
493          */
494         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
495         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
496
497         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
498
499         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
500                                                                                  * selection */
501
502         simple_heap_insert(pg_database_rel, tuple);
503
504         /* Update indexes */
505         CatalogUpdateIndexes(pg_database_rel, tuple);
506
507         /* Close pg_database, but keep exclusive lock till commit */
508         heap_close(pg_database_rel, NoLock);
509
510         /*
511          * Set flag to update flat database file at commit.
512          */
513         database_file_update_needed();
514 }
515
516
517 /*
518  * DROP DATABASE
519  */
520 void
521 dropdb(const char *dbname)
522 {
523         int4            db_owner;
524         bool            db_istemplate;
525         Oid                     db_id;
526         Relation        pgdbrel;
527         SysScanDesc pgdbscan;
528         ScanKeyData key;
529         HeapTuple       tup;
530
531         PreventTransactionChain((void *) dbname, "DROP DATABASE");
532
533         AssertArg(dbname);
534
535         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
536                 ereport(ERROR,
537                                 (errcode(ERRCODE_OBJECT_IN_USE),
538                                  errmsg("cannot drop the currently open database")));
539
540         /*
541          * Obtain exclusive lock on pg_database.  We need this to ensure that
542          * no new backend starts up in the target database while we are
543          * deleting it.  (Actually, a new backend might still manage to start
544          * up, because it isn't able to lock pg_database while starting.  But
545          * it will detect its error in ReverifyMyDatabase and shut down before
546          * any serious damage is done.  See postinit.c.)
547          *
548          * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient
549          * since ReverifyMyDatabase takes RowShareLock.  This allows ordinary
550          * readers of pg_database to proceed in parallel.
551          */
552         pgdbrel = heap_openr(DatabaseRelationName, ExclusiveLock);
553
554         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
555                                          &db_istemplate, NULL, NULL, NULL, NULL))
556                 ereport(ERROR,
557                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
558                                  errmsg("database \"%s\" does not exist", dbname)));
559
560         if (GetUserId() != db_owner && !superuser())
561                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
562                                            dbname);
563
564         /*
565          * Disallow dropping a DB that is marked istemplate.  This is just to
566          * prevent people from accidentally dropping template0 or template1;
567          * they can do so if they're really determined ...
568          */
569         if (db_istemplate)
570                 ereport(ERROR,
571                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
572                                  errmsg("cannot drop a template database")));
573
574         /*
575          * Check for active backends in the target database.
576          */
577         if (DatabaseHasActiveBackends(db_id, false))
578                 ereport(ERROR,
579                                 (errcode(ERRCODE_OBJECT_IN_USE),
580                            errmsg("database \"%s\" is being accessed by other users",
581                                           dbname)));
582
583         /*
584          * Find the database's tuple by OID (should be unique).
585          */
586         ScanKeyInit(&key,
587                                 ObjectIdAttributeNumber,
588                                 BTEqualStrategyNumber, F_OIDEQ,
589                                 ObjectIdGetDatum(db_id));
590
591         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
592                                                                   SnapshotNow, 1, &key);
593
594         tup = systable_getnext(pgdbscan);
595         if (!HeapTupleIsValid(tup))
596         {
597                 /*
598                  * This error should never come up since the existence of the
599                  * database is checked earlier
600                  */
601                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
602                          dbname);
603         }
604
605         /* Remove the database's tuple from pg_database */
606         simple_heap_delete(pgdbrel, &tup->t_self);
607
608         systable_endscan(pgdbscan);
609
610         /*
611          * Delete any comments associated with the database
612          *
613          * NOTE: this is probably dead code since any such comments should have
614          * been in that database, not mine.
615          */
616         DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
617
618         /*
619          * Drop pages for this database that are in the shared buffer cache.
620          * This is important to ensure that no remaining backend tries to
621          * write out a dirty buffer to the dead database later...
622          */
623         DropBuffers(db_id);
624
625         /*
626          * Also, clean out any entries in the shared free space map.
627          */
628         FreeSpaceMapForgetDatabase(db_id);
629
630         /*
631          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
632          * open files, which would cause rmdir() to fail.
633          */
634 #ifdef WIN32
635         RequestCheckpoint(true);
636 #endif
637
638         /*
639          * Remove all tablespace subdirs belonging to the database.
640          */
641         remove_dbtablespaces(db_id);
642
643         /* Close pg_database, but keep exclusive lock till commit */
644         heap_close(pgdbrel, NoLock);
645
646         /*
647          * Set flag to update flat database file at commit.
648          */
649         database_file_update_needed();
650 }
651
652
653 /*
654  * Rename database
655  */
656 void
657 RenameDatabase(const char *oldname, const char *newname)
658 {
659         HeapTuple       tup,
660                                 newtup;
661         Relation        rel;
662         SysScanDesc scan,
663                                 scan2;
664         ScanKeyData key,
665                                 key2;
666
667         /*
668          * Obtain ExclusiveLock so that no new session gets started
669          * while the rename is in progress.
670          */
671         rel = heap_openr(DatabaseRelationName, ExclusiveLock);
672
673         ScanKeyInit(&key,
674                                 Anum_pg_database_datname,
675                                 BTEqualStrategyNumber, F_NAMEEQ,
676                                 NameGetDatum(oldname));
677         scan = systable_beginscan(rel, DatabaseNameIndex, true,
678                                                           SnapshotNow, 1, &key);
679
680         tup = systable_getnext(scan);
681         if (!HeapTupleIsValid(tup))
682                 ereport(ERROR,
683                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
684                                  errmsg("database \"%s\" does not exist", oldname)));
685
686         /*
687          * XXX Client applications probably store the current database
688          * somewhere, so renaming it could cause confusion.  On the other
689          * hand, there may not be an actual problem besides a little
690          * confusion, so think about this and decide.
691          */
692         if (HeapTupleGetOid(tup) == MyDatabaseId)
693                 ereport(ERROR,
694                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
695                                  errmsg("current database may not be renamed")));
696
697         /*
698          * Make sure the database does not have active sessions.  Might not be
699          * necessary, but it's consistent with other database operations.
700          */
701         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
702                 ereport(ERROR,
703                                 (errcode(ERRCODE_OBJECT_IN_USE),
704                            errmsg("database \"%s\" is being accessed by other users",
705                                           oldname)));
706
707         /* make sure the new name doesn't exist */
708         ScanKeyInit(&key2,
709                                 Anum_pg_database_datname,
710                                 BTEqualStrategyNumber, F_NAMEEQ,
711                                 NameGetDatum(newname));
712         scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
713                                                            SnapshotNow, 1, &key2);
714         if (HeapTupleIsValid(systable_getnext(scan2)))
715                 ereport(ERROR,
716                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
717                                  errmsg("database \"%s\" already exists", newname)));
718         systable_endscan(scan2);
719
720         /* must be owner */
721         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
722                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
723                                            oldname);
724
725         /* must have createdb rights */
726         if (!superuser() && !have_createdb_privilege())
727                 ereport(ERROR,
728                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
729                                  errmsg("permission denied to rename database")));
730
731         /* rename */
732         newtup = heap_copytuple(tup);
733         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
734         simple_heap_update(rel, &newtup->t_self, newtup);
735         CatalogUpdateIndexes(rel, newtup);
736
737         systable_endscan(scan);
738
739         /* Close pg_database, but keep exclusive lock till commit */
740         heap_close(rel, NoLock);
741
742         /*
743          * Set flag to update flat database file at commit.
744          */
745         database_file_update_needed();
746 }
747
748
749 /*
750  * ALTER DATABASE name SET ...
751  */
752 void
753 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
754 {
755         char       *valuestr;
756         HeapTuple       tuple,
757                                 newtuple;
758         Relation        rel;
759         ScanKeyData scankey;
760         SysScanDesc scan;
761         Datum           repl_val[Natts_pg_database];
762         char            repl_null[Natts_pg_database];
763         char            repl_repl[Natts_pg_database];
764
765         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
766
767         /*
768          * We don't need ExclusiveLock since we aren't updating the
769          * flat file.
770          */
771         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
772         ScanKeyInit(&scankey,
773                                 Anum_pg_database_datname,
774                                 BTEqualStrategyNumber, F_NAMEEQ,
775                                 NameGetDatum(stmt->dbname));
776         scan = systable_beginscan(rel, DatabaseNameIndex, true,
777                                                           SnapshotNow, 1, &scankey);
778         tuple = systable_getnext(scan);
779         if (!HeapTupleIsValid(tuple))
780                 ereport(ERROR,
781                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
782                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
783
784         if (!(superuser()
785                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
786                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
787                                            stmt->dbname);
788
789         MemSet(repl_repl, ' ', sizeof(repl_repl));
790         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
791
792         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
793         {
794                 /* RESET ALL */
795                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
796                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
797         }
798         else
799         {
800                 Datum           datum;
801                 bool            isnull;
802                 ArrayType  *a;
803
804                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
805
806                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
807                                                          RelationGetDescr(rel), &isnull);
808
809                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
810
811                 if (valuestr)
812                         a = GUCArrayAdd(a, stmt->variable, valuestr);
813                 else
814                         a = GUCArrayDelete(a, stmt->variable);
815
816                 if (a)
817                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
818                 else
819                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
820         }
821
822         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
823         simple_heap_update(rel, &tuple->t_self, newtuple);
824
825         /* Update indexes */
826         CatalogUpdateIndexes(rel, newtuple);
827
828         systable_endscan(scan);
829
830         /* Close pg_database, but keep lock till commit */
831         heap_close(rel, NoLock);
832
833         /*
834          * We don't bother updating the flat file since ALTER DATABASE SET
835          * doesn't affect it.
836          */
837 }
838
839
840 /*
841  * ALTER DATABASE name OWNER TO newowner
842  */
843 void
844 AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
845 {
846         HeapTuple       tuple;
847         Relation        rel;
848         ScanKeyData scankey;
849         SysScanDesc scan;
850         Form_pg_database datForm;
851
852         /*
853          * We don't need ExclusiveLock since we aren't updating the
854          * flat file.
855          */
856         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
857         ScanKeyInit(&scankey,
858                                 Anum_pg_database_datname,
859                                 BTEqualStrategyNumber, F_NAMEEQ,
860                                 NameGetDatum(dbname));
861         scan = systable_beginscan(rel, DatabaseNameIndex, true,
862                                                           SnapshotNow, 1, &scankey);
863         tuple = systable_getnext(scan);
864         if (!HeapTupleIsValid(tuple))
865                 ereport(ERROR,
866                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
867                                  errmsg("database \"%s\" does not exist", dbname)));
868
869         datForm = (Form_pg_database) GETSTRUCT(tuple);
870
871         /*
872          * If the new owner is the same as the existing owner, consider the
873          * command to have succeeded.  This is to be consistent with other
874          * objects.
875          */
876         if (datForm->datdba != newOwnerSysId)
877         {
878                 Datum           repl_val[Natts_pg_database];
879                 char            repl_null[Natts_pg_database];
880                 char            repl_repl[Natts_pg_database];
881                 Acl                *newAcl;
882                 Datum           aclDatum;
883                 bool            isNull;
884                 HeapTuple       newtuple;
885
886                 /* must be superuser to change ownership */
887                 if (!superuser())
888                         ereport(ERROR,
889                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
890                                          errmsg("must be superuser to change owner")));
891
892                 memset(repl_null, ' ', sizeof(repl_null));
893                 memset(repl_repl, ' ', sizeof(repl_repl));
894
895                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
896                 repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
897
898                 /*
899                  * Determine the modified ACL for the new owner.  This is only
900                  * necessary when the ACL is non-null.
901                  */
902                 aclDatum = heap_getattr(tuple,
903                                                                 Anum_pg_database_datacl,
904                                                                 RelationGetDescr(rel),
905                                                                 &isNull);
906                 if (!isNull)
907                 {
908                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
909                                                                  datForm->datdba, newOwnerSysId);
910                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
911                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
912                 }
913
914                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
915                 simple_heap_update(rel, &newtuple->t_self, newtuple);
916                 CatalogUpdateIndexes(rel, newtuple);
917
918                 heap_freetuple(newtuple);
919         }
920
921         systable_endscan(scan);
922
923         /* Close pg_database, but keep lock till commit */
924         heap_close(rel, NoLock);
925
926         /*
927          * We don't bother updating the flat file since ALTER DATABASE OWNER
928          * doesn't affect it.
929          */
930 }
931
932
933 /*
934  * Helper functions
935  */
936
937 static bool
938 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
939                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
940                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
941                         Oid *dbTablespace)
942 {
943         Relation        relation;
944         ScanKeyData scanKey;
945         SysScanDesc scan;
946         HeapTuple       tuple;
947         bool            gottuple;
948
949         AssertArg(name);
950
951         /* Caller may wish to grab a better lock on pg_database beforehand... */
952         relation = heap_openr(DatabaseRelationName, AccessShareLock);
953
954         ScanKeyInit(&scanKey,
955                                 Anum_pg_database_datname,
956                                 BTEqualStrategyNumber, F_NAMEEQ,
957                                 NameGetDatum(name));
958
959         scan = systable_beginscan(relation, DatabaseNameIndex, true,
960                                                           SnapshotNow, 1, &scanKey);
961
962         tuple = systable_getnext(scan);
963
964         gottuple = HeapTupleIsValid(tuple);
965         if (gottuple)
966         {
967                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
968
969                 /* oid of the database */
970                 if (dbIdP)
971                         *dbIdP = HeapTupleGetOid(tuple);
972                 /* sysid of the owner */
973                 if (ownerIdP)
974                         *ownerIdP = dbform->datdba;
975                 /* character encoding */
976                 if (encodingP)
977                         *encodingP = dbform->encoding;
978                 /* allowed as template? */
979                 if (dbIsTemplateP)
980                         *dbIsTemplateP = dbform->datistemplate;
981                 /* last system OID used in database */
982                 if (dbLastSysOidP)
983                         *dbLastSysOidP = dbform->datlastsysoid;
984                 /* limit of vacuumed XIDs */
985                 if (dbVacuumXidP)
986                         *dbVacuumXidP = dbform->datvacuumxid;
987                 /* limit of frozen XIDs */
988                 if (dbFrozenXidP)
989                         *dbFrozenXidP = dbform->datfrozenxid;
990                 /* default tablespace for this database */
991                 if (dbTablespace)
992                         *dbTablespace = dbform->dattablespace;
993         }
994
995         systable_endscan(scan);
996         heap_close(relation, AccessShareLock);
997
998         return gottuple;
999 }
1000
1001 /* Check if current user has createdb privileges */
1002 static bool
1003 have_createdb_privilege(void)
1004 {
1005         bool            result = false;
1006         HeapTuple       utup;
1007
1008         utup = SearchSysCache(SHADOWSYSID,
1009                                                   Int32GetDatum(GetUserId()),
1010                                                   0, 0, 0);
1011         if (HeapTupleIsValid(utup))
1012         {
1013                 result = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
1014                 ReleaseSysCache(utup);
1015         }
1016         return result;
1017 }
1018
1019 /*
1020  * Remove tablespace directories
1021  *
1022  * We don't know what tablespaces db_id is using, so iterate through all
1023  * tablespaces removing <tablespace>/db_id
1024  */
1025 static void
1026 remove_dbtablespaces(Oid db_id)
1027 {
1028         Relation        rel;
1029         HeapScanDesc scan;
1030         HeapTuple       tuple;
1031
1032         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
1033         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1034         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1035         {
1036                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1037                 char       *dstpath;
1038                 struct stat st;
1039
1040                 /* Don't mess with the global tablespace */
1041                 if (dsttablespace == GLOBALTABLESPACE_OID)
1042                         continue;
1043
1044                 dstpath = GetDatabasePath(db_id, dsttablespace);
1045
1046                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1047                 {
1048                         /* Assume we can ignore it */
1049                         pfree(dstpath);
1050                         continue;
1051                 }
1052
1053                 if (!rmtree(dstpath, true))
1054                         ereport(WARNING,
1055                                         (errmsg("could not remove database directory \"%s\"",
1056                                                         dstpath)));
1057
1058                 /* Record the filesystem change in XLOG */
1059                 {
1060                         xl_dbase_drop_rec xlrec;
1061                         XLogRecData rdata[2];
1062
1063                         xlrec.db_id = db_id;
1064                         rdata[0].buffer = InvalidBuffer;
1065                         rdata[0].data = (char *) &xlrec;
1066                         rdata[0].len = offsetof(xl_dbase_drop_rec, dir_path);
1067                         rdata[0].next = &(rdata[1]);
1068
1069                         rdata[1].buffer = InvalidBuffer;
1070                         rdata[1].data = (char *) dstpath;
1071                         rdata[1].len = strlen(dstpath) + 1;
1072                         rdata[1].next = NULL;
1073
1074                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1075                 }
1076
1077                 pfree(dstpath);
1078         }
1079
1080         heap_endscan(scan);
1081         heap_close(rel, AccessShareLock);
1082 }
1083
1084
1085 /*
1086  * get_database_oid - given a database name, look up the OID
1087  *
1088  * Returns InvalidOid if database name not found.
1089  *
1090  * This is not actually used in this file, but is exported for use elsewhere.
1091  */
1092 Oid
1093 get_database_oid(const char *dbname)
1094 {
1095         Relation        pg_database;
1096         ScanKeyData entry[1];
1097         SysScanDesc scan;
1098         HeapTuple       dbtuple;
1099         Oid                     oid;
1100
1101         /* There's no syscache for pg_database, so must look the hard way */
1102         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1103         ScanKeyInit(&entry[0],
1104                                 Anum_pg_database_datname,
1105                                 BTEqualStrategyNumber, F_NAMEEQ,
1106                                 CStringGetDatum(dbname));
1107         scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
1108                                                           SnapshotNow, 1, entry);
1109
1110         dbtuple = systable_getnext(scan);
1111
1112         /* We assume that there can be at most one matching tuple */
1113         if (HeapTupleIsValid(dbtuple))
1114                 oid = HeapTupleGetOid(dbtuple);
1115         else
1116                 oid = InvalidOid;
1117
1118         systable_endscan(scan);
1119         heap_close(pg_database, AccessShareLock);
1120
1121         return oid;
1122 }
1123
1124
1125 /*
1126  * get_database_name - given a database OID, look up the name
1127  *
1128  * Returns a palloc'd string, or NULL if no such database.
1129  *
1130  * This is not actually used in this file, but is exported for use elsewhere.
1131  */
1132 char *
1133 get_database_name(Oid dbid)
1134 {
1135         Relation        pg_database;
1136         ScanKeyData entry[1];
1137         SysScanDesc scan;
1138         HeapTuple       dbtuple;
1139         char       *result;
1140
1141         /* There's no syscache for pg_database, so must look the hard way */
1142         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1143         ScanKeyInit(&entry[0],
1144                                 ObjectIdAttributeNumber,
1145                                 BTEqualStrategyNumber, F_OIDEQ,
1146                                 ObjectIdGetDatum(dbid));
1147         scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
1148                                                           SnapshotNow, 1, entry);
1149
1150         dbtuple = systable_getnext(scan);
1151
1152         /* We assume that there can be at most one matching tuple */
1153         if (HeapTupleIsValid(dbtuple))
1154                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1155         else
1156                 result = NULL;
1157
1158         systable_endscan(scan);
1159         heap_close(pg_database, AccessShareLock);
1160
1161         return result;
1162 }
1163
1164 /*
1165  * DATABASE resource manager's routines
1166  */
1167 void
1168 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1169 {
1170         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1171
1172         if (info == XLOG_DBASE_CREATE)
1173         {
1174                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1175                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1176                 struct stat st;
1177
1178 #ifndef WIN32
1179                 char            buf[2 * MAXPGPATH + 100];
1180 #endif
1181
1182                 /*
1183                  * Our theory for replaying a CREATE is to forcibly drop the
1184                  * target subdirectory if present, then re-copy the source data.
1185                  * This may be more work than needed, but it is simple to
1186                  * implement.
1187                  */
1188                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1189                 {
1190                         if (!rmtree(dst_path, true))
1191                                 ereport(WARNING,
1192                                         (errmsg("could not remove database directory \"%s\"",
1193                                                         dst_path)));
1194                 }
1195
1196                 /*
1197                  * Force dirty buffers out to disk, to ensure source database is
1198                  * up-to-date for the copy.  (We really only need to flush buffers for
1199                  * the source database, but bufmgr.c provides no API for that.)
1200                  */
1201                 BufferSync();
1202
1203 #ifndef WIN32
1204
1205                 /*
1206                  * Copy this subdirectory to the new location
1207                  *
1208                  * XXX use of cp really makes this code pretty grotty, particularly
1209                  * with respect to lack of ability to report errors well.  Someday
1210                  * rewrite to do it for ourselves.
1211                  */
1212
1213                 /* We might need to use cp -R one day for portability */
1214                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1215                                  xlrec->src_path, dst_path);
1216                 if (system(buf) != 0)
1217                         ereport(ERROR,
1218                                         (errmsg("could not initialize database directory"),
1219                                          errdetail("Failing system command was: %s", buf),
1220                                          errhint("Look in the postmaster's stderr log for more information.")));
1221 #else                                                   /* WIN32 */
1222                 if (copydir(xlrec->src_path, dst_path) != 0)
1223                 {
1224                         /* copydir should already have given details of its troubles */
1225                         ereport(ERROR,
1226                                         (errmsg("could not initialize database directory")));
1227                 }
1228 #endif   /* WIN32 */
1229         }
1230         else if (info == XLOG_DBASE_DROP)
1231         {
1232                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1233
1234                 /*
1235                  * Drop pages for this database that are in the shared buffer
1236                  * cache
1237                  */
1238                 DropBuffers(xlrec->db_id);
1239
1240                 if (!rmtree(xlrec->dir_path, true))
1241                         ereport(WARNING,
1242                                         (errmsg("could not remove database directory \"%s\"",
1243                                                         xlrec->dir_path)));
1244         }
1245         else
1246                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1247 }
1248
1249 void
1250 dbase_undo(XLogRecPtr lsn, XLogRecord *record)
1251 {
1252         elog(PANIC, "dbase_undo: unimplemented");
1253 }
1254
1255 void
1256 dbase_desc(char *buf, uint8 xl_info, char *rec)
1257 {
1258         uint8           info = xl_info & ~XLR_INFO_MASK;
1259
1260         if (info == XLOG_DBASE_CREATE)
1261         {
1262                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1263                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1264
1265                 sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
1266                                 xlrec->db_id, xlrec->src_path, dst_path);
1267         }
1268         else if (info == XLOG_DBASE_DROP)
1269         {
1270                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1271
1272                 sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
1273                                 xlrec->db_id, xlrec->dir_path);
1274         }
1275         else
1276                 strcat(buf, "UNKNOWN");
1277 }