]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
6d0fa6a13d453b6cd93cadca0c874db103a40c79
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  *
7  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.147 2004/11/18 01:14:26 tgl Exp $
13  *
14  *-------------------------------------------------------------------------
15  */
16 #include "postgres.h"
17
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21
22 #include "access/genam.h"
23 #include "access/heapam.h"
24 #include "catalog/catname.h"
25 #include "catalog/catalog.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_shadow.h"
28 #include "catalog/pg_tablespace.h"
29 #include "catalog/indexing.h"
30 #include "commands/comment.h"
31 #include "commands/dbcommands.h"
32 #include "commands/tablespace.h"
33 #include "mb/pg_wchar.h"
34 #include "miscadmin.h"
35 #include "postmaster/bgwriter.h"
36 #include "storage/fd.h"
37 #include "storage/freespace.h"
38 #include "storage/sinval.h"
39 #include "utils/acl.h"
40 #include "utils/array.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/guc.h"
44 #include "utils/lsyscache.h"
45 #include "utils/syscache.h"
46
47
48 /* non-export function prototypes */
49 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
50                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
51                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
52                         Oid *dbTablespace);
53 static bool have_createdb_privilege(void);
54 static void remove_dbtablespaces(Oid db_id);
55
56
57 /*
58  * CREATE DATABASE
59  */
60 void
61 createdb(const CreatedbStmt *stmt)
62 {
63         HeapScanDesc scan;
64         Relation        rel;
65         Oid                     src_dboid;
66         AclId           src_owner;
67         int                     src_encoding;
68         bool            src_istemplate;
69         Oid                     src_lastsysoid;
70         TransactionId src_vacuumxid;
71         TransactionId src_frozenxid;
72         Oid                     src_deftablespace;
73         Oid                     dst_deftablespace;
74         Relation        pg_database_rel;
75         HeapTuple       tuple;
76         TupleDesc       pg_database_dsc;
77         Datum           new_record[Natts_pg_database];
78         char            new_record_nulls[Natts_pg_database];
79         Oid                     dboid;
80         AclId           datdba;
81         ListCell   *option;
82         DefElem    *dtablespacename = NULL;
83         DefElem    *downer = NULL;
84         DefElem    *dtemplate = NULL;
85         DefElem    *dencoding = NULL;
86         char       *dbname = stmt->dbname;
87         char       *dbowner = NULL;
88         char       *dbtemplate = NULL;
89         int                     encoding = -1;
90
91 #ifndef WIN32
92         char            buf[2 * MAXPGPATH + 100];
93 #endif
94
95         /* don't call this in a transaction block */
96         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
97
98         /* Extract options from the statement node tree */
99         foreach(option, stmt->options)
100         {
101                 DefElem    *defel = (DefElem *) lfirst(option);
102
103                 if (strcmp(defel->defname, "tablespace") == 0)
104                 {
105                         if (dtablespacename)
106                                 ereport(ERROR,
107                                                 (errcode(ERRCODE_SYNTAX_ERROR),
108                                                  errmsg("conflicting or redundant options")));
109                         dtablespacename = defel;
110                 }
111                 else if (strcmp(defel->defname, "owner") == 0)
112                 {
113                         if (downer)
114                                 ereport(ERROR,
115                                                 (errcode(ERRCODE_SYNTAX_ERROR),
116                                                  errmsg("conflicting or redundant options")));
117                         downer = defel;
118                 }
119                 else if (strcmp(defel->defname, "template") == 0)
120                 {
121                         if (dtemplate)
122                                 ereport(ERROR,
123                                                 (errcode(ERRCODE_SYNTAX_ERROR),
124                                                  errmsg("conflicting or redundant options")));
125                         dtemplate = defel;
126                 }
127                 else if (strcmp(defel->defname, "encoding") == 0)
128                 {
129                         if (dencoding)
130                                 ereport(ERROR,
131                                                 (errcode(ERRCODE_SYNTAX_ERROR),
132                                                  errmsg("conflicting or redundant options")));
133                         dencoding = defel;
134                 }
135                 else if (strcmp(defel->defname, "location") == 0)
136                 {
137                         ereport(WARNING,
138                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
139                                          errmsg("LOCATION is not supported anymore"),
140                                          errhint("Consider using tablespaces instead.")));
141                 }
142                 else
143                         elog(ERROR, "option \"%s\" not recognized",
144                                  defel->defname);
145         }
146
147         if (downer && downer->arg)
148                 dbowner = strVal(downer->arg);
149         if (dtemplate && dtemplate->arg)
150                 dbtemplate = strVal(dtemplate->arg);
151         if (dencoding && dencoding->arg)
152         {
153                 const char *encoding_name;
154
155                 if (IsA(dencoding->arg, Integer))
156                 {
157                         encoding = intVal(dencoding->arg);
158                         encoding_name = pg_encoding_to_char(encoding);
159                         if (strcmp(encoding_name, "") == 0 ||
160                                 pg_valid_server_encoding(encoding_name) < 0)
161                                 ereport(ERROR,
162                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
163                                                  errmsg("%d is not a valid encoding code",
164                                                                 encoding)));
165                 }
166                 else if (IsA(dencoding->arg, String))
167                 {
168                         encoding_name = strVal(dencoding->arg);
169                         if (pg_valid_server_encoding(encoding_name) < 0)
170                                 ereport(ERROR,
171                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
172                                                  errmsg("%s is not a valid encoding name",
173                                                                 encoding_name)));
174                         encoding = pg_char_to_encoding(encoding_name);
175                 }
176                 else
177                         elog(ERROR, "unrecognized node type: %d",
178                                  nodeTag(dencoding->arg));
179         }
180
181         /* obtain sysid of proposed owner */
182         if (dbowner)
183                 datdba = get_usesysid(dbowner); /* will ereport if no such user */
184         else
185                 datdba = GetUserId();
186
187         if (datdba == GetUserId())
188         {
189                 /* creating database for self: can be superuser or createdb */
190                 if (!superuser() && !have_createdb_privilege())
191                         ereport(ERROR,
192                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
193                                          errmsg("permission denied to create database")));
194         }
195         else
196         {
197                 /* creating database for someone else: must be superuser */
198                 /* note that the someone else need not have any permissions */
199                 if (!superuser())
200                         ereport(ERROR,
201                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
202                                          errmsg("must be superuser to create database for another user")));
203         }
204
205         /*
206          * Check for db name conflict.  There is a race condition here, since
207          * another backend could create the same DB name before we commit.
208          * However, holding an exclusive lock on pg_database for the whole
209          * time we are copying the source database doesn't seem like a good
210          * idea, so accept possibility of race to create.  We will check again
211          * after we grab the exclusive lock.
212          */
213         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
214                 ereport(ERROR,
215                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
216                                  errmsg("database \"%s\" already exists", dbname)));
217
218         /*
219          * Lookup database (template) to be cloned.
220          */
221         if (!dbtemplate)
222                 dbtemplate = "template1";               /* Default template database name */
223
224         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
225                                          &src_istemplate, &src_lastsysoid,
226                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
227                 ereport(ERROR,
228                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
229                  errmsg("template database \"%s\" does not exist", dbtemplate)));
230
231         /*
232          * Permission check: to copy a DB that's not marked datistemplate, you
233          * must be superuser or the owner thereof.
234          */
235         if (!src_istemplate)
236         {
237                 if (!superuser() && GetUserId() != src_owner)
238                         ereport(ERROR,
239                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
240                                          errmsg("permission denied to copy database \"%s\"",
241                                                         dbtemplate)));
242         }
243
244         /*
245          * The source DB can't have any active backends, except this one
246          * (exception is to allow CREATE DB while connected to template1).
247          * Otherwise we might copy inconsistent data.  This check is not
248          * bulletproof, since someone might connect while we are copying...
249          */
250         if (DatabaseHasActiveBackends(src_dboid, true))
251                 ereport(ERROR,
252                                 (errcode(ERRCODE_OBJECT_IN_USE),
253                 errmsg("source database \"%s\" is being accessed by other users",
254                            dbtemplate)));
255
256         /* If encoding is defaulted, use source's encoding */
257         if (encoding < 0)
258                 encoding = src_encoding;
259
260         /* Some encodings are client only */
261         if (!PG_VALID_BE_ENCODING(encoding))
262                 ereport(ERROR,
263                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
264                                  errmsg("invalid server encoding %d", encoding)));
265
266         /* Resolve default tablespace for new database */
267         if (dtablespacename && dtablespacename->arg)
268         {
269                 char       *tablespacename;
270                 AclResult       aclresult;
271
272                 tablespacename = strVal(dtablespacename->arg);
273                 dst_deftablespace = get_tablespace_oid(tablespacename);
274                 if (!OidIsValid(dst_deftablespace))
275                         ereport(ERROR,
276                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
277                                          errmsg("tablespace \"%s\" does not exist",
278                                                         tablespacename)));
279                 /* check permissions */
280                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
281                                                                                    ACL_CREATE);
282                 if (aclresult != ACLCHECK_OK)
283                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
284                                                    tablespacename);
285
286                 /*
287                  * If we are trying to change the default tablespace of the template,
288                  * we require that the template not have any files in the new default
289                  * tablespace.  This is necessary because otherwise the copied
290                  * database would contain pg_class rows that refer to its default
291                  * tablespace both explicitly (by OID) and implicitly (as zero), which
292                  * would cause problems.  For example another CREATE DATABASE using
293                  * the copied database as template, and trying to change its default
294                  * tablespace again, would yield outright incorrect results (it would
295                  * improperly move tables to the new default tablespace that should
296                  * stay in the same tablespace).
297                  */
298                 if (dst_deftablespace != src_deftablespace)
299                 {
300                         char       *srcpath;
301                         struct stat st;
302
303                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
304
305                         if (stat(srcpath, &st) == 0 &&
306                                 S_ISDIR(st.st_mode) &&
307                                 !directory_is_empty(srcpath))
308                                 ereport(ERROR,
309                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
310                                                  errmsg("cannot assign new default tablespace \"%s\"",
311                                                                 tablespacename),
312                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
313                                                                    dbtemplate)));
314                         pfree(srcpath);
315                 }
316         }
317         else
318         {
319                 /* Use template database's default tablespace */
320                 dst_deftablespace = src_deftablespace;
321                 /* Note there is no additional permission check in this path */
322         }
323
324         /*
325          * Preassign OID for pg_database tuple, so that we can compute db
326          * path.
327          */
328         dboid = newoid();
329
330         /*
331          * Force dirty buffers out to disk, to ensure source database is
332          * up-to-date for the copy.  (We really only need to flush buffers for
333          * the source database, but bufmgr.c provides no API for that.)
334          */
335         BufferSync(-1, -1);
336
337         /*
338          * Close virtual file descriptors so the kernel has more available for
339          * the system() calls below.
340          */
341         closeAllVfds();
342
343         /*
344          * Iterate through all tablespaces of the template database, and copy
345          * each one to the new database.
346          */
347         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
348         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
349         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
350         {
351                 Oid                     srctablespace = HeapTupleGetOid(tuple);
352                 Oid                     dsttablespace;
353                 char       *srcpath;
354                 char       *dstpath;
355                 struct stat st;
356
357                 /* No need to copy global tablespace */
358                 if (srctablespace == GLOBALTABLESPACE_OID)
359                         continue;
360
361                 srcpath = GetDatabasePath(src_dboid, srctablespace);
362
363                 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
364                         directory_is_empty(srcpath))
365                 {
366                         /* Assume we can ignore it */
367                         pfree(srcpath);
368                         continue;
369                 }
370
371                 if (srctablespace == src_deftablespace)
372                         dsttablespace = dst_deftablespace;
373                 else
374                         dsttablespace = srctablespace;
375
376                 dstpath = GetDatabasePath(dboid, dsttablespace);
377
378                 if (stat(dstpath, &st) == 0 || errno != ENOENT)
379                 {
380                         remove_dbtablespaces(dboid);
381                         ereport(ERROR,
382                                         (errmsg("could not initialize database directory"),
383                                          errdetail("Directory \"%s\" already exists.",
384                                                            dstpath)));
385                 }
386
387 #ifndef WIN32
388
389                 /*
390                  * Copy this subdirectory to the new location
391                  *
392                  * XXX use of cp really makes this code pretty grotty, particularly
393                  * with respect to lack of ability to report errors well.  Someday
394                  * rewrite to do it for ourselves.
395                  */
396
397                 /* We might need to use cp -R one day for portability */
398                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
399                                  srcpath, dstpath);
400                 if (system(buf) != 0)
401                 {
402                         remove_dbtablespaces(dboid);
403                         ereport(ERROR,
404                                         (errmsg("could not initialize database directory"),
405                                          errdetail("Failing system command was: %s", buf),
406                                          errhint("Look in the postmaster's stderr log for more information.")));
407                 }
408 #else                                                   /* WIN32 */
409                 if (copydir(srcpath, dstpath) != 0)
410                 {
411                         /* copydir should already have given details of its troubles */
412                         remove_dbtablespaces(dboid);
413                         ereport(ERROR,
414                                         (errmsg("could not initialize database directory")));
415                 }
416 #endif   /* WIN32 */
417
418                 /* Record the filesystem change in XLOG */
419                 {
420                         xl_dbase_create_rec xlrec;
421                         XLogRecData rdata[3];
422
423                         xlrec.db_id = dboid;
424                         rdata[0].buffer = InvalidBuffer;
425                         rdata[0].data = (char *) &xlrec;
426                         rdata[0].len = offsetof(xl_dbase_create_rec, src_path);
427                         rdata[0].next = &(rdata[1]);
428
429                         rdata[1].buffer = InvalidBuffer;
430                         rdata[1].data = (char *) srcpath;
431                         rdata[1].len = strlen(srcpath) + 1;
432                         rdata[1].next = &(rdata[2]);
433
434                         rdata[2].buffer = InvalidBuffer;
435                         rdata[2].data = (char *) dstpath;
436                         rdata[2].len = strlen(dstpath) + 1;
437                         rdata[2].next = NULL;
438
439                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
440                 }
441         }
442         heap_endscan(scan);
443         heap_close(rel, AccessShareLock);
444
445         /*
446          * Now OK to grab exclusive lock on pg_database.
447          */
448         pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
449
450         /* Check to see if someone else created same DB name meanwhile. */
451         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
452         {
453                 /* Don't hold lock while doing recursive remove */
454                 heap_close(pg_database_rel, AccessExclusiveLock);
455                 remove_dbtablespaces(dboid);
456                 ereport(ERROR,
457                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
458                                  errmsg("database \"%s\" already exists", dbname)));
459         }
460
461         /*
462          * Insert a new tuple into pg_database
463          */
464         pg_database_dsc = RelationGetDescr(pg_database_rel);
465
466         /* Form tuple */
467         MemSet(new_record, 0, sizeof(new_record));
468         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
469
470         new_record[Anum_pg_database_datname - 1] =
471                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
472         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
473         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
474         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
475         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
476         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
477         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
478         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
479         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
480
481         /*
482          * We deliberately set datconfig and datacl to defaults (NULL), rather
483          * than copying them from the template database.  Copying datacl would
484          * be a bad idea when the owner is not the same as the template's
485          * owner. It's more debatable whether datconfig should be copied.
486          */
487         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
488         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
489
490         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
491
492         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
493                                                                                  * selection */
494
495         simple_heap_insert(pg_database_rel, tuple);
496
497         /* Update indexes */
498         CatalogUpdateIndexes(pg_database_rel, tuple);
499
500         /*
501          * Force dirty buffers out to disk, so that newly-connecting backends
502          * will see the new database in pg_database right away.  (They'll see
503          * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
504          */
505         FlushRelationBuffers(pg_database_rel, MaxBlockNumber);
506
507         /* Close pg_database, but keep exclusive lock till commit */
508         heap_close(pg_database_rel, NoLock);
509 }
510
511
512 /*
513  * DROP DATABASE
514  */
515 void
516 dropdb(const char *dbname)
517 {
518         int4            db_owner;
519         bool            db_istemplate;
520         Oid                     db_id;
521         Relation        pgdbrel;
522         SysScanDesc pgdbscan;
523         ScanKeyData key;
524         HeapTuple       tup;
525
526         PreventTransactionChain((void *) dbname, "DROP DATABASE");
527
528         AssertArg(dbname);
529
530         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
531                 ereport(ERROR,
532                                 (errcode(ERRCODE_OBJECT_IN_USE),
533                                  errmsg("cannot drop the currently open database")));
534
535         /*
536          * Obtain exclusive lock on pg_database.  We need this to ensure that
537          * no new backend starts up in the target database while we are
538          * deleting it.  (Actually, a new backend might still manage to start
539          * up, because it will read pg_database without any locking to
540          * discover the database's OID.  But it will detect its error in
541          * ReverifyMyDatabase and shut down before any serious damage is done.
542          * See postinit.c.)
543          */
544         pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
545
546         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
547                                          &db_istemplate, NULL, NULL, NULL, NULL))
548                 ereport(ERROR,
549                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
550                                  errmsg("database \"%s\" does not exist", dbname)));
551
552         if (GetUserId() != db_owner && !superuser())
553                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
554                                            dbname);
555
556         /*
557          * Disallow dropping a DB that is marked istemplate.  This is just to
558          * prevent people from accidentally dropping template0 or template1;
559          * they can do so if they're really determined ...
560          */
561         if (db_istemplate)
562                 ereport(ERROR,
563                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
564                                  errmsg("cannot drop a template database")));
565
566         /*
567          * Check for active backends in the target database.
568          */
569         if (DatabaseHasActiveBackends(db_id, false))
570                 ereport(ERROR,
571                                 (errcode(ERRCODE_OBJECT_IN_USE),
572                            errmsg("database \"%s\" is being accessed by other users",
573                                           dbname)));
574
575         /*
576          * Find the database's tuple by OID (should be unique).
577          */
578         ScanKeyInit(&key,
579                                 ObjectIdAttributeNumber,
580                                 BTEqualStrategyNumber, F_OIDEQ,
581                                 ObjectIdGetDatum(db_id));
582
583         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
584                                                                   SnapshotNow, 1, &key);
585
586         tup = systable_getnext(pgdbscan);
587         if (!HeapTupleIsValid(tup))
588         {
589                 /*
590                  * This error should never come up since the existence of the
591                  * database is checked earlier
592                  */
593                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
594                          dbname);
595         }
596
597         /* Remove the database's tuple from pg_database */
598         simple_heap_delete(pgdbrel, &tup->t_self);
599
600         systable_endscan(pgdbscan);
601
602         /*
603          * Delete any comments associated with the database
604          *
605          * NOTE: this is probably dead code since any such comments should have
606          * been in that database, not mine.
607          */
608         DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
609
610         /*
611          * Drop pages for this database that are in the shared buffer cache.
612          * This is important to ensure that no remaining backend tries to
613          * write out a dirty buffer to the dead database later...
614          */
615         DropBuffers(db_id);
616
617         /*
618          * Also, clean out any entries in the shared free space map.
619          */
620         FreeSpaceMapForgetDatabase(db_id);
621
622         /*
623          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
624          * open files, which would cause rmdir() to fail.
625          */
626 #ifdef WIN32
627         RequestCheckpoint(true);
628 #endif
629
630         /*
631          * Remove all tablespace subdirs belonging to the database.
632          */
633         remove_dbtablespaces(db_id);
634
635         /*
636          * Force dirty buffers out to disk, so that newly-connecting backends
637          * will see the database tuple marked dead in pg_database right away.
638          * (They'll see an uncommitted deletion, but they don't care; see
639          * GetRawDatabaseInfo.)
640          */
641         FlushRelationBuffers(pgdbrel, MaxBlockNumber);
642
643         /* Close pg_database, but keep exclusive lock till commit */
644         heap_close(pgdbrel, NoLock);
645 }
646
647
648 /*
649  * Rename database
650  */
651 void
652 RenameDatabase(const char *oldname, const char *newname)
653 {
654         HeapTuple       tup,
655                                 newtup;
656         Relation        rel;
657         SysScanDesc scan,
658                                 scan2;
659         ScanKeyData key,
660                                 key2;
661
662         /*
663          * Obtain AccessExclusiveLock so that no new session gets started
664          * while the rename is in progress.
665          */
666         rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
667
668         ScanKeyInit(&key,
669                                 Anum_pg_database_datname,
670                                 BTEqualStrategyNumber, F_NAMEEQ,
671                                 NameGetDatum(oldname));
672         scan = systable_beginscan(rel, DatabaseNameIndex, true,
673                                                           SnapshotNow, 1, &key);
674
675         tup = systable_getnext(scan);
676         if (!HeapTupleIsValid(tup))
677                 ereport(ERROR,
678                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
679                                  errmsg("database \"%s\" does not exist", oldname)));
680
681         /*
682          * XXX Client applications probably store the current database
683          * somewhere, so renaming it could cause confusion.  On the other
684          * hand, there may not be an actual problem besides a little
685          * confusion, so think about this and decide.
686          */
687         if (HeapTupleGetOid(tup) == MyDatabaseId)
688                 ereport(ERROR,
689                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
690                                  errmsg("current database may not be renamed")));
691
692         /*
693          * Make sure the database does not have active sessions.  Might not be
694          * necessary, but it's consistent with other database operations.
695          */
696         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
697                 ereport(ERROR,
698                                 (errcode(ERRCODE_OBJECT_IN_USE),
699                            errmsg("database \"%s\" is being accessed by other users",
700                                           oldname)));
701
702         /* make sure the new name doesn't exist */
703         ScanKeyInit(&key2,
704                                 Anum_pg_database_datname,
705                                 BTEqualStrategyNumber, F_NAMEEQ,
706                                 NameGetDatum(newname));
707         scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
708                                                            SnapshotNow, 1, &key2);
709         if (HeapTupleIsValid(systable_getnext(scan2)))
710                 ereport(ERROR,
711                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
712                                  errmsg("database \"%s\" already exists", newname)));
713         systable_endscan(scan2);
714
715         /* must be owner */
716         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
717                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
718                                            oldname);
719
720         /* must have createdb */
721         if (!have_createdb_privilege())
722                 ereport(ERROR,
723                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
724                                  errmsg("permission denied to rename database")));
725
726         /* rename */
727         newtup = heap_copytuple(tup);
728         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
729         simple_heap_update(rel, &newtup->t_self, newtup);
730         CatalogUpdateIndexes(rel, newtup);
731
732         systable_endscan(scan);
733
734         /*
735          * Force dirty buffers out to disk, so that newly-connecting backends
736          * will see the renamed database in pg_database right away.  (They'll
737          * see an uncommitted tuple, but they don't care; see
738          * GetRawDatabaseInfo.)
739          */
740         FlushRelationBuffers(rel, MaxBlockNumber);
741
742         /* Close pg_database, but keep exclusive lock till commit */
743         heap_close(rel, NoLock);
744 }
745
746
747 /*
748  * ALTER DATABASE name SET ...
749  */
750 void
751 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
752 {
753         char       *valuestr;
754         HeapTuple       tuple,
755                                 newtuple;
756         Relation        rel;
757         ScanKeyData scankey;
758         SysScanDesc scan;
759         Datum           repl_val[Natts_pg_database];
760         char            repl_null[Natts_pg_database];
761         char            repl_repl[Natts_pg_database];
762
763         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
764
765         /*
766          * We need AccessExclusiveLock so we can safely do FlushRelationBuffers.
767          */
768         rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
769         ScanKeyInit(&scankey,
770                                 Anum_pg_database_datname,
771                                 BTEqualStrategyNumber, F_NAMEEQ,
772                                 NameGetDatum(stmt->dbname));
773         scan = systable_beginscan(rel, DatabaseNameIndex, true,
774                                                           SnapshotNow, 1, &scankey);
775         tuple = systable_getnext(scan);
776         if (!HeapTupleIsValid(tuple))
777                 ereport(ERROR,
778                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
779                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
780
781         if (!(superuser()
782                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
783                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
784                                            stmt->dbname);
785
786         MemSet(repl_repl, ' ', sizeof(repl_repl));
787         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
788
789         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
790         {
791                 /* RESET ALL */
792                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
793                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
794         }
795         else
796         {
797                 Datum           datum;
798                 bool            isnull;
799                 ArrayType  *a;
800
801                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
802
803                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
804                                                          RelationGetDescr(rel), &isnull);
805
806                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
807
808                 if (valuestr)
809                         a = GUCArrayAdd(a, stmt->variable, valuestr);
810                 else
811                         a = GUCArrayDelete(a, stmt->variable);
812
813                 if (a)
814                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
815                 else
816                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
817         }
818
819         newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
820         simple_heap_update(rel, &tuple->t_self, newtuple);
821
822         /* Update indexes */
823         CatalogUpdateIndexes(rel, newtuple);
824
825         systable_endscan(scan);
826
827         /*
828          * Force dirty buffers out to disk, so that newly-connecting backends
829          * will see the altered row in pg_database right away.  (They'll
830          * see an uncommitted tuple, but they don't care; see
831          * GetRawDatabaseInfo.)
832          */
833         FlushRelationBuffers(rel, MaxBlockNumber);
834
835         /* Close pg_database, but keep exclusive lock till commit */
836         heap_close(rel, NoLock);
837 }
838
839
840 /*
841  * ALTER DATABASE name OWNER TO newowner
842  */
843 void
844 AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
845 {
846         HeapTuple       tuple;
847         Relation        rel;
848         ScanKeyData scankey;
849         SysScanDesc scan;
850         Form_pg_database datForm;
851
852         /*
853          * We need AccessExclusiveLock so we can safely do FlushRelationBuffers.
854          */
855         rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
856         ScanKeyInit(&scankey,
857                                 Anum_pg_database_datname,
858                                 BTEqualStrategyNumber, F_NAMEEQ,
859                                 NameGetDatum(dbname));
860         scan = systable_beginscan(rel, DatabaseNameIndex, true,
861                                                           SnapshotNow, 1, &scankey);
862         tuple = systable_getnext(scan);
863         if (!HeapTupleIsValid(tuple))
864                 ereport(ERROR,
865                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
866                                  errmsg("database \"%s\" does not exist", dbname)));
867
868         datForm = (Form_pg_database) GETSTRUCT(tuple);
869
870         /*
871          * If the new owner is the same as the existing owner, consider the
872          * command to have succeeded.  This is to be consistent with other
873          * objects.
874          */
875         if (datForm->datdba != newOwnerSysId)
876         {
877                 Datum           repl_val[Natts_pg_database];
878                 char            repl_null[Natts_pg_database];
879                 char            repl_repl[Natts_pg_database];
880                 Acl                *newAcl;
881                 Datum           aclDatum;
882                 bool            isNull;
883                 HeapTuple       newtuple;
884
885                 /* changing owner's database for someone else: must be superuser */
886                 /* note that the someone else need not have any permissions */
887                 if (!superuser())
888                         ereport(ERROR,
889                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
890                                          errmsg("must be superuser to change owner")));
891
892                 memset(repl_null, ' ', sizeof(repl_null));
893                 memset(repl_repl, ' ', sizeof(repl_repl));
894
895                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
896                 repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
897
898                 /*
899                  * Determine the modified ACL for the new owner.  This is only
900                  * necessary when the ACL is non-null.
901                  */
902                 aclDatum = heap_getattr(tuple,
903                                                                 Anum_pg_database_datacl,
904                                                                 RelationGetDescr(rel),
905                                                                 &isNull);
906                 if (!isNull)
907                 {
908                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
909                                                                  datForm->datdba, newOwnerSysId);
910                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
911                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
912                 }
913
914                 newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
915                 simple_heap_update(rel, &newtuple->t_self, newtuple);
916                 CatalogUpdateIndexes(rel, newtuple);
917
918                 heap_freetuple(newtuple);
919
920                 /* must release buffer pins before FlushRelationBuffers */
921                 systable_endscan(scan);
922
923                 /*
924                  * Force dirty buffers out to disk, so that newly-connecting backends
925                  * will see the altered row in pg_database right away.  (They'll
926                  * see an uncommitted tuple, but they don't care; see
927                  * GetRawDatabaseInfo.)
928                  */
929                 FlushRelationBuffers(rel, MaxBlockNumber);
930         }
931         else
932                 systable_endscan(scan);
933
934         /* Close pg_database, but keep exclusive lock till commit */
935         heap_close(rel, NoLock);
936 }
937
938
939 /*
940  * Helper functions
941  */
942
943 static bool
944 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
945                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
946                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
947                         Oid *dbTablespace)
948 {
949         Relation        relation;
950         ScanKeyData scanKey;
951         SysScanDesc scan;
952         HeapTuple       tuple;
953         bool            gottuple;
954
955         AssertArg(name);
956
957         /* Caller may wish to grab a better lock on pg_database beforehand... */
958         relation = heap_openr(DatabaseRelationName, AccessShareLock);
959
960         ScanKeyInit(&scanKey,
961                                 Anum_pg_database_datname,
962                                 BTEqualStrategyNumber, F_NAMEEQ,
963                                 NameGetDatum(name));
964
965         scan = systable_beginscan(relation, DatabaseNameIndex, true,
966                                                           SnapshotNow, 1, &scanKey);
967
968         tuple = systable_getnext(scan);
969
970         gottuple = HeapTupleIsValid(tuple);
971         if (gottuple)
972         {
973                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
974
975                 /* oid of the database */
976                 if (dbIdP)
977                         *dbIdP = HeapTupleGetOid(tuple);
978                 /* sysid of the owner */
979                 if (ownerIdP)
980                         *ownerIdP = dbform->datdba;
981                 /* character encoding */
982                 if (encodingP)
983                         *encodingP = dbform->encoding;
984                 /* allowed as template? */
985                 if (dbIsTemplateP)
986                         *dbIsTemplateP = dbform->datistemplate;
987                 /* last system OID used in database */
988                 if (dbLastSysOidP)
989                         *dbLastSysOidP = dbform->datlastsysoid;
990                 /* limit of vacuumed XIDs */
991                 if (dbVacuumXidP)
992                         *dbVacuumXidP = dbform->datvacuumxid;
993                 /* limit of frozen XIDs */
994                 if (dbFrozenXidP)
995                         *dbFrozenXidP = dbform->datfrozenxid;
996                 /* default tablespace for this database */
997                 if (dbTablespace)
998                         *dbTablespace = dbform->dattablespace;
999         }
1000
1001         systable_endscan(scan);
1002         heap_close(relation, AccessShareLock);
1003
1004         return gottuple;
1005 }
1006
1007 static bool
1008 have_createdb_privilege(void)
1009 {
1010         HeapTuple       utup;
1011         bool            retval;
1012
1013         utup = SearchSysCache(SHADOWSYSID,
1014                                                   Int32GetDatum(GetUserId()),
1015                                                   0, 0, 0);
1016
1017         if (!HeapTupleIsValid(utup))
1018                 retval = false;
1019         else
1020                 retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
1021
1022         ReleaseSysCache(utup);
1023
1024         return retval;
1025 }
1026
1027 /*
1028  * Remove tablespace directories
1029  *
1030  * We don't know what tablespaces db_id is using, so iterate through all
1031  * tablespaces removing <tablespace>/db_id
1032  */
1033 static void
1034 remove_dbtablespaces(Oid db_id)
1035 {
1036         Relation        rel;
1037         HeapScanDesc scan;
1038         HeapTuple       tuple;
1039
1040         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
1041         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1042         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1043         {
1044                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1045                 char       *dstpath;
1046                 struct stat st;
1047
1048                 /* Don't mess with the global tablespace */
1049                 if (dsttablespace == GLOBALTABLESPACE_OID)
1050                         continue;
1051
1052                 dstpath = GetDatabasePath(db_id, dsttablespace);
1053
1054                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1055                 {
1056                         /* Assume we can ignore it */
1057                         pfree(dstpath);
1058                         continue;
1059                 }
1060
1061                 if (!rmtree(dstpath, true))
1062                         ereport(WARNING,
1063                                         (errmsg("could not remove database directory \"%s\"",
1064                                                         dstpath)));
1065
1066                 /* Record the filesystem change in XLOG */
1067                 {
1068                         xl_dbase_drop_rec xlrec;
1069                         XLogRecData rdata[2];
1070
1071                         xlrec.db_id = db_id;
1072                         rdata[0].buffer = InvalidBuffer;
1073                         rdata[0].data = (char *) &xlrec;
1074                         rdata[0].len = offsetof(xl_dbase_drop_rec, dir_path);
1075                         rdata[0].next = &(rdata[1]);
1076
1077                         rdata[1].buffer = InvalidBuffer;
1078                         rdata[1].data = (char *) dstpath;
1079                         rdata[1].len = strlen(dstpath) + 1;
1080                         rdata[1].next = NULL;
1081
1082                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1083                 }
1084
1085                 pfree(dstpath);
1086         }
1087
1088         heap_endscan(scan);
1089         heap_close(rel, AccessShareLock);
1090 }
1091
1092
1093 /*
1094  * get_database_oid - given a database name, look up the OID
1095  *
1096  * Returns InvalidOid if database name not found.
1097  *
1098  * This is not actually used in this file, but is exported for use elsewhere.
1099  */
1100 Oid
1101 get_database_oid(const char *dbname)
1102 {
1103         Relation        pg_database;
1104         ScanKeyData entry[1];
1105         SysScanDesc scan;
1106         HeapTuple       dbtuple;
1107         Oid                     oid;
1108
1109         /* There's no syscache for pg_database, so must look the hard way */
1110         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1111         ScanKeyInit(&entry[0],
1112                                 Anum_pg_database_datname,
1113                                 BTEqualStrategyNumber, F_NAMEEQ,
1114                                 CStringGetDatum(dbname));
1115         scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
1116                                                           SnapshotNow, 1, entry);
1117
1118         dbtuple = systable_getnext(scan);
1119
1120         /* We assume that there can be at most one matching tuple */
1121         if (HeapTupleIsValid(dbtuple))
1122                 oid = HeapTupleGetOid(dbtuple);
1123         else
1124                 oid = InvalidOid;
1125
1126         systable_endscan(scan);
1127         heap_close(pg_database, AccessShareLock);
1128
1129         return oid;
1130 }
1131
1132
1133 /*
1134  * get_database_name - given a database OID, look up the name
1135  *
1136  * Returns a palloc'd string, or NULL if no such database.
1137  *
1138  * This is not actually used in this file, but is exported for use elsewhere.
1139  */
1140 char *
1141 get_database_name(Oid dbid)
1142 {
1143         Relation        pg_database;
1144         ScanKeyData entry[1];
1145         SysScanDesc scan;
1146         HeapTuple       dbtuple;
1147         char       *result;
1148
1149         /* There's no syscache for pg_database, so must look the hard way */
1150         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1151         ScanKeyInit(&entry[0],
1152                                 ObjectIdAttributeNumber,
1153                                 BTEqualStrategyNumber, F_OIDEQ,
1154                                 ObjectIdGetDatum(dbid));
1155         scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
1156                                                           SnapshotNow, 1, entry);
1157
1158         dbtuple = systable_getnext(scan);
1159
1160         /* We assume that there can be at most one matching tuple */
1161         if (HeapTupleIsValid(dbtuple))
1162                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1163         else
1164                 result = NULL;
1165
1166         systable_endscan(scan);
1167         heap_close(pg_database, AccessShareLock);
1168
1169         return result;
1170 }
1171
1172 /*
1173  * DATABASE resource manager's routines
1174  */
1175 void
1176 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1177 {
1178         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1179
1180         if (info == XLOG_DBASE_CREATE)
1181         {
1182                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1183                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1184                 struct stat st;
1185
1186 #ifndef WIN32
1187                 char            buf[2 * MAXPGPATH + 100];
1188 #endif
1189
1190                 /*
1191                  * Our theory for replaying a CREATE is to forcibly drop the
1192                  * target subdirectory if present, then re-copy the source data.
1193                  * This may be more work than needed, but it is simple to
1194                  * implement.
1195                  */
1196                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1197                 {
1198                         if (!rmtree(dst_path, true))
1199                                 ereport(WARNING,
1200                                         (errmsg("could not remove database directory \"%s\"",
1201                                                         dst_path)));
1202                 }
1203
1204                 /*
1205                  * Force dirty buffers out to disk, to ensure source database is
1206                  * up-to-date for the copy.  (We really only need to flush buffers for
1207                  * the source database, but bufmgr.c provides no API for that.)
1208                  */
1209                 BufferSync(-1, -1);
1210
1211 #ifndef WIN32
1212
1213                 /*
1214                  * Copy this subdirectory to the new location
1215                  *
1216                  * XXX use of cp really makes this code pretty grotty, particularly
1217                  * with respect to lack of ability to report errors well.  Someday
1218                  * rewrite to do it for ourselves.
1219                  */
1220
1221                 /* We might need to use cp -R one day for portability */
1222                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1223                                  xlrec->src_path, dst_path);
1224                 if (system(buf) != 0)
1225                         ereport(ERROR,
1226                                         (errmsg("could not initialize database directory"),
1227                                          errdetail("Failing system command was: %s", buf),
1228                                          errhint("Look in the postmaster's stderr log for more information.")));
1229 #else                                                   /* WIN32 */
1230                 if (copydir(xlrec->src_path, dst_path) != 0)
1231                 {
1232                         /* copydir should already have given details of its troubles */
1233                         ereport(ERROR,
1234                                         (errmsg("could not initialize database directory")));
1235                 }
1236 #endif   /* WIN32 */
1237         }
1238         else if (info == XLOG_DBASE_DROP)
1239         {
1240                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1241
1242                 /*
1243                  * Drop pages for this database that are in the shared buffer
1244                  * cache
1245                  */
1246                 DropBuffers(xlrec->db_id);
1247
1248                 if (!rmtree(xlrec->dir_path, true))
1249                         ereport(WARNING,
1250                                         (errmsg("could not remove database directory \"%s\"",
1251                                                         xlrec->dir_path)));
1252         }
1253         else
1254                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1255 }
1256
1257 void
1258 dbase_undo(XLogRecPtr lsn, XLogRecord *record)
1259 {
1260         elog(PANIC, "dbase_undo: unimplemented");
1261 }
1262
1263 void
1264 dbase_desc(char *buf, uint8 xl_info, char *rec)
1265 {
1266         uint8           info = xl_info & ~XLR_INFO_MASK;
1267
1268         if (info == XLOG_DBASE_CREATE)
1269         {
1270                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1271                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1272
1273                 sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
1274                                 xlrec->db_id, xlrec->src_path, dst_path);
1275         }
1276         else if (info == XLOG_DBASE_DROP)
1277         {
1278                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1279
1280                 sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
1281                                 xlrec->db_id, xlrec->dir_path);
1282         }
1283         else
1284                 strcat(buf, "UNKNOWN");
1285 }