]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
On Windows, force a checkpoint just before dropping a database's physical
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  *
7  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.146 2004/10/28 00:39:58 tgl Exp $
13  *
14  *-------------------------------------------------------------------------
15  */
16 #include "postgres.h"
17
18 #include <fcntl.h>
19 #include <unistd.h>
20 #include <sys/stat.h>
21
22 #include "access/genam.h"
23 #include "access/heapam.h"
24 #include "catalog/catname.h"
25 #include "catalog/catalog.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_shadow.h"
28 #include "catalog/pg_tablespace.h"
29 #include "catalog/indexing.h"
30 #include "commands/comment.h"
31 #include "commands/dbcommands.h"
32 #include "commands/tablespace.h"
33 #include "mb/pg_wchar.h"
34 #include "miscadmin.h"
35 #include "postmaster/bgwriter.h"
36 #include "storage/fd.h"
37 #include "storage/freespace.h"
38 #include "storage/sinval.h"
39 #include "utils/acl.h"
40 #include "utils/array.h"
41 #include "utils/builtins.h"
42 #include "utils/fmgroids.h"
43 #include "utils/guc.h"
44 #include "utils/lsyscache.h"
45 #include "utils/syscache.h"
46
47
48 /* non-export function prototypes */
49 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
50                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
51                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
52                         Oid *dbTablespace);
53 static bool have_createdb_privilege(void);
54 static void remove_dbtablespaces(Oid db_id);
55
56
57 /*
58  * CREATE DATABASE
59  */
60 void
61 createdb(const CreatedbStmt *stmt)
62 {
63         HeapScanDesc scan;
64         Relation        rel;
65         Oid                     src_dboid;
66         AclId           src_owner;
67         int                     src_encoding;
68         bool            src_istemplate;
69         Oid                     src_lastsysoid;
70         TransactionId src_vacuumxid;
71         TransactionId src_frozenxid;
72         Oid                     src_deftablespace;
73         Oid                     dst_deftablespace;
74         Relation        pg_database_rel;
75         HeapTuple       tuple;
76         TupleDesc       pg_database_dsc;
77         Datum           new_record[Natts_pg_database];
78         char            new_record_nulls[Natts_pg_database];
79         Oid                     dboid;
80         AclId           datdba;
81         ListCell   *option;
82         DefElem    *dtablespacename = NULL;
83         DefElem    *downer = NULL;
84         DefElem    *dtemplate = NULL;
85         DefElem    *dencoding = NULL;
86         char       *dbname = stmt->dbname;
87         char       *dbowner = NULL;
88         char       *dbtemplate = NULL;
89         int                     encoding = -1;
90
91 #ifndef WIN32
92         char            buf[2 * MAXPGPATH + 100];
93 #endif
94
95         /* don't call this in a transaction block */
96         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
97
98         /* Extract options from the statement node tree */
99         foreach(option, stmt->options)
100         {
101                 DefElem    *defel = (DefElem *) lfirst(option);
102
103                 if (strcmp(defel->defname, "tablespace") == 0)
104                 {
105                         if (dtablespacename)
106                                 ereport(ERROR,
107                                                 (errcode(ERRCODE_SYNTAX_ERROR),
108                                                  errmsg("conflicting or redundant options")));
109                         dtablespacename = defel;
110                 }
111                 else if (strcmp(defel->defname, "owner") == 0)
112                 {
113                         if (downer)
114                                 ereport(ERROR,
115                                                 (errcode(ERRCODE_SYNTAX_ERROR),
116                                                  errmsg("conflicting or redundant options")));
117                         downer = defel;
118                 }
119                 else if (strcmp(defel->defname, "template") == 0)
120                 {
121                         if (dtemplate)
122                                 ereport(ERROR,
123                                                 (errcode(ERRCODE_SYNTAX_ERROR),
124                                                  errmsg("conflicting or redundant options")));
125                         dtemplate = defel;
126                 }
127                 else if (strcmp(defel->defname, "encoding") == 0)
128                 {
129                         if (dencoding)
130                                 ereport(ERROR,
131                                                 (errcode(ERRCODE_SYNTAX_ERROR),
132                                                  errmsg("conflicting or redundant options")));
133                         dencoding = defel;
134                 }
135                 else if (strcmp(defel->defname, "location") == 0)
136                 {
137                         ereport(WARNING,
138                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
139                                          errmsg("LOCATION is not supported anymore"),
140                                          errhint("Consider using tablespaces instead.")));
141                 }
142                 else
143                         elog(ERROR, "option \"%s\" not recognized",
144                                  defel->defname);
145         }
146
147         if (downer && downer->arg)
148                 dbowner = strVal(downer->arg);
149         if (dtemplate && dtemplate->arg)
150                 dbtemplate = strVal(dtemplate->arg);
151         if (dencoding && dencoding->arg)
152         {
153                 const char *encoding_name;
154
155                 if (IsA(dencoding->arg, Integer))
156                 {
157                         encoding = intVal(dencoding->arg);
158                         encoding_name = pg_encoding_to_char(encoding);
159                         if (strcmp(encoding_name, "") == 0 ||
160                                 pg_valid_server_encoding(encoding_name) < 0)
161                                 ereport(ERROR,
162                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
163                                                  errmsg("%d is not a valid encoding code",
164                                                                 encoding)));
165                 }
166                 else if (IsA(dencoding->arg, String))
167                 {
168                         encoding_name = strVal(dencoding->arg);
169                         if (pg_valid_server_encoding(encoding_name) < 0)
170                                 ereport(ERROR,
171                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
172                                                  errmsg("%s is not a valid encoding name",
173                                                                 encoding_name)));
174                         encoding = pg_char_to_encoding(encoding_name);
175                 }
176                 else
177                         elog(ERROR, "unrecognized node type: %d",
178                                  nodeTag(dencoding->arg));
179         }
180
181         /* obtain sysid of proposed owner */
182         if (dbowner)
183                 datdba = get_usesysid(dbowner); /* will ereport if no such user */
184         else
185                 datdba = GetUserId();
186
187         if (datdba == GetUserId())
188         {
189                 /* creating database for self: can be superuser or createdb */
190                 if (!superuser() && !have_createdb_privilege())
191                         ereport(ERROR,
192                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
193                                          errmsg("permission denied to create database")));
194         }
195         else
196         {
197                 /* creating database for someone else: must be superuser */
198                 /* note that the someone else need not have any permissions */
199                 if (!superuser())
200                         ereport(ERROR,
201                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
202                                          errmsg("must be superuser to create database for another user")));
203         }
204
205         /*
206          * Check for db name conflict.  There is a race condition here, since
207          * another backend could create the same DB name before we commit.
208          * However, holding an exclusive lock on pg_database for the whole
209          * time we are copying the source database doesn't seem like a good
210          * idea, so accept possibility of race to create.  We will check again
211          * after we grab the exclusive lock.
212          */
213         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
214                 ereport(ERROR,
215                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
216                                  errmsg("database \"%s\" already exists", dbname)));
217
218         /*
219          * Lookup database (template) to be cloned.
220          */
221         if (!dbtemplate)
222                 dbtemplate = "template1";               /* Default template database name */
223
224         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
225                                          &src_istemplate, &src_lastsysoid,
226                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
227                 ereport(ERROR,
228                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
229                  errmsg("template database \"%s\" does not exist", dbtemplate)));
230
231         /*
232          * Permission check: to copy a DB that's not marked datistemplate, you
233          * must be superuser or the owner thereof.
234          */
235         if (!src_istemplate)
236         {
237                 if (!superuser() && GetUserId() != src_owner)
238                         ereport(ERROR,
239                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
240                                          errmsg("permission denied to copy database \"%s\"",
241                                                         dbtemplate)));
242         }
243
244         /*
245          * The source DB can't have any active backends, except this one
246          * (exception is to allow CREATE DB while connected to template1).
247          * Otherwise we might copy inconsistent data.  This check is not
248          * bulletproof, since someone might connect while we are copying...
249          */
250         if (DatabaseHasActiveBackends(src_dboid, true))
251                 ereport(ERROR,
252                                 (errcode(ERRCODE_OBJECT_IN_USE),
253                 errmsg("source database \"%s\" is being accessed by other users",
254                            dbtemplate)));
255
256         /* If encoding is defaulted, use source's encoding */
257         if (encoding < 0)
258                 encoding = src_encoding;
259
260         /* Some encodings are client only */
261         if (!PG_VALID_BE_ENCODING(encoding))
262                 ereport(ERROR,
263                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
264                                  errmsg("invalid server encoding %d", encoding)));
265
266         /* Resolve default tablespace for new database */
267         if (dtablespacename && dtablespacename->arg)
268         {
269                 char       *tablespacename;
270                 AclResult       aclresult;
271
272                 tablespacename = strVal(dtablespacename->arg);
273                 dst_deftablespace = get_tablespace_oid(tablespacename);
274                 if (!OidIsValid(dst_deftablespace))
275                         ereport(ERROR,
276                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
277                                          errmsg("tablespace \"%s\" does not exist",
278                                                         tablespacename)));
279                 /* check permissions */
280                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
281                                                                                    ACL_CREATE);
282                 if (aclresult != ACLCHECK_OK)
283                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
284                                                    tablespacename);
285
286                 /*
287                  * If we are trying to change the default tablespace of the template,
288                  * we require that the template not have any files in the new default
289                  * tablespace.  This is necessary because otherwise the copied
290                  * database would contain pg_class rows that refer to its default
291                  * tablespace both explicitly (by OID) and implicitly (as zero), which
292                  * would cause problems.  For example another CREATE DATABASE using
293                  * the copied database as template, and trying to change its default
294                  * tablespace again, would yield outright incorrect results (it would
295                  * improperly move tables to the new default tablespace that should
296                  * stay in the same tablespace).
297                  */
298                 if (dst_deftablespace != src_deftablespace)
299                 {
300                         char       *srcpath;
301                         struct stat st;
302
303                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
304
305                         if (stat(srcpath, &st) == 0 &&
306                                 S_ISDIR(st.st_mode) &&
307                                 !directory_is_empty(srcpath))
308                                 ereport(ERROR,
309                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
310                                                  errmsg("cannot assign new default tablespace \"%s\"",
311                                                                 tablespacename),
312                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
313                                                                    dbtemplate)));
314                         pfree(srcpath);
315                 }
316         }
317         else
318         {
319                 /* Use template database's default tablespace */
320                 dst_deftablespace = src_deftablespace;
321                 /* Note there is no additional permission check in this path */
322         }
323
324         /*
325          * Preassign OID for pg_database tuple, so that we can compute db
326          * path.
327          */
328         dboid = newoid();
329
330         /*
331          * Force dirty buffers out to disk, to ensure source database is
332          * up-to-date for the copy.  (We really only need to flush buffers for
333          * the source database...)
334          */
335         BufferSync(-1, -1);
336
337         /*
338          * Close virtual file descriptors so the kernel has more available for
339          * the system() calls below.
340          */
341         closeAllVfds();
342
343         /*
344          * Iterate through all tablespaces of the template database, and copy
345          * each one to the new database.
346          */
347         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
348         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
349         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
350         {
351                 Oid                     srctablespace = HeapTupleGetOid(tuple);
352                 Oid                     dsttablespace;
353                 char       *srcpath;
354                 char       *dstpath;
355                 struct stat st;
356
357                 /* No need to copy global tablespace */
358                 if (srctablespace == GLOBALTABLESPACE_OID)
359                         continue;
360
361                 srcpath = GetDatabasePath(src_dboid, srctablespace);
362
363                 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
364                         directory_is_empty(srcpath))
365                 {
366                         /* Assume we can ignore it */
367                         pfree(srcpath);
368                         continue;
369                 }
370
371                 if (srctablespace == src_deftablespace)
372                         dsttablespace = dst_deftablespace;
373                 else
374                         dsttablespace = srctablespace;
375
376                 dstpath = GetDatabasePath(dboid, dsttablespace);
377
378                 if (stat(dstpath, &st) == 0 || errno != ENOENT)
379                 {
380                         remove_dbtablespaces(dboid);
381                         ereport(ERROR,
382                                         (errmsg("could not initialize database directory"),
383                                          errdetail("Directory \"%s\" already exists.",
384                                                            dstpath)));
385                 }
386
387 #ifndef WIN32
388
389                 /*
390                  * Copy this subdirectory to the new location
391                  *
392                  * XXX use of cp really makes this code pretty grotty, particularly
393                  * with respect to lack of ability to report errors well.  Someday
394                  * rewrite to do it for ourselves.
395                  */
396
397                 /* We might need to use cp -R one day for portability */
398                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
399                                  srcpath, dstpath);
400                 if (system(buf) != 0)
401                 {
402                         remove_dbtablespaces(dboid);
403                         ereport(ERROR,
404                                         (errmsg("could not initialize database directory"),
405                                          errdetail("Failing system command was: %s", buf),
406                                          errhint("Look in the postmaster's stderr log for more information.")));
407                 }
408 #else                                                   /* WIN32 */
409                 if (copydir(srcpath, dstpath) != 0)
410                 {
411                         /* copydir should already have given details of its troubles */
412                         remove_dbtablespaces(dboid);
413                         ereport(ERROR,
414                                         (errmsg("could not initialize database directory")));
415                 }
416 #endif   /* WIN32 */
417
418                 /* Record the filesystem change in XLOG */
419                 {
420                         xl_dbase_create_rec xlrec;
421                         XLogRecData rdata[3];
422
423                         xlrec.db_id = dboid;
424                         rdata[0].buffer = InvalidBuffer;
425                         rdata[0].data = (char *) &xlrec;
426                         rdata[0].len = offsetof(xl_dbase_create_rec, src_path);
427                         rdata[0].next = &(rdata[1]);
428
429                         rdata[1].buffer = InvalidBuffer;
430                         rdata[1].data = (char *) srcpath;
431                         rdata[1].len = strlen(srcpath) + 1;
432                         rdata[1].next = &(rdata[2]);
433
434                         rdata[2].buffer = InvalidBuffer;
435                         rdata[2].data = (char *) dstpath;
436                         rdata[2].len = strlen(dstpath) + 1;
437                         rdata[2].next = NULL;
438
439                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
440                 }
441         }
442         heap_endscan(scan);
443         heap_close(rel, AccessShareLock);
444
445         /*
446          * Now OK to grab exclusive lock on pg_database.
447          */
448         pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
449
450         /* Check to see if someone else created same DB name meanwhile. */
451         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
452         {
453                 /* Don't hold lock while doing recursive remove */
454                 heap_close(pg_database_rel, AccessExclusiveLock);
455                 remove_dbtablespaces(dboid);
456                 ereport(ERROR,
457                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
458                                  errmsg("database \"%s\" already exists", dbname)));
459         }
460
461         /*
462          * Insert a new tuple into pg_database
463          */
464         pg_database_dsc = RelationGetDescr(pg_database_rel);
465
466         /* Form tuple */
467         MemSet(new_record, 0, sizeof(new_record));
468         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
469
470         new_record[Anum_pg_database_datname - 1] =
471                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
472         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
473         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
474         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
475         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
476         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
477         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
478         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
479         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
480
481         /*
482          * We deliberately set datconfig and datacl to defaults (NULL), rather
483          * than copying them from the template database.  Copying datacl would
484          * be a bad idea when the owner is not the same as the template's
485          * owner. It's more debatable whether datconfig should be copied.
486          */
487         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
488         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
489
490         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
491
492         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
493                                                                                  * selection */
494
495         simple_heap_insert(pg_database_rel, tuple);
496
497         /* Update indexes */
498         CatalogUpdateIndexes(pg_database_rel, tuple);
499
500         /* Close pg_database, but keep lock till commit */
501         heap_close(pg_database_rel, NoLock);
502
503         /*
504          * Force dirty buffers out to disk, so that newly-connecting backends
505          * will see the new database in pg_database right away.  (They'll see
506          * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
507          */
508         BufferSync(-1, -1);
509 }
510
511
512 /*
513  * DROP DATABASE
514  */
515 void
516 dropdb(const char *dbname)
517 {
518         int4            db_owner;
519         bool            db_istemplate;
520         Oid                     db_id;
521         Relation        pgdbrel;
522         SysScanDesc pgdbscan;
523         ScanKeyData key;
524         HeapTuple       tup;
525
526         PreventTransactionChain((void *) dbname, "DROP DATABASE");
527
528         AssertArg(dbname);
529
530         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
531                 ereport(ERROR,
532                                 (errcode(ERRCODE_OBJECT_IN_USE),
533                                  errmsg("cannot drop the currently open database")));
534
535         /*
536          * Obtain exclusive lock on pg_database.  We need this to ensure that
537          * no new backend starts up in the target database while we are
538          * deleting it.  (Actually, a new backend might still manage to start
539          * up, because it will read pg_database without any locking to
540          * discover the database's OID.  But it will detect its error in
541          * ReverifyMyDatabase and shut down before any serious damage is done.
542          * See postinit.c.)
543          */
544         pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
545
546         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
547                                          &db_istemplate, NULL, NULL, NULL, NULL))
548                 ereport(ERROR,
549                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
550                                  errmsg("database \"%s\" does not exist", dbname)));
551
552         if (GetUserId() != db_owner && !superuser())
553                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
554                                            dbname);
555
556         /*
557          * Disallow dropping a DB that is marked istemplate.  This is just to
558          * prevent people from accidentally dropping template0 or template1;
559          * they can do so if they're really determined ...
560          */
561         if (db_istemplate)
562                 ereport(ERROR,
563                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
564                                  errmsg("cannot drop a template database")));
565
566         /*
567          * Check for active backends in the target database.
568          */
569         if (DatabaseHasActiveBackends(db_id, false))
570                 ereport(ERROR,
571                                 (errcode(ERRCODE_OBJECT_IN_USE),
572                            errmsg("database \"%s\" is being accessed by other users",
573                                           dbname)));
574
575         /*
576          * Find the database's tuple by OID (should be unique).
577          */
578         ScanKeyInit(&key,
579                                 ObjectIdAttributeNumber,
580                                 BTEqualStrategyNumber, F_OIDEQ,
581                                 ObjectIdGetDatum(db_id));
582
583         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndex, true,
584                                                                   SnapshotNow, 1, &key);
585
586         tup = systable_getnext(pgdbscan);
587         if (!HeapTupleIsValid(tup))
588         {
589                 /*
590                  * This error should never come up since the existence of the
591                  * database is checked earlier
592                  */
593                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
594                          dbname);
595         }
596
597         /* Remove the database's tuple from pg_database */
598         simple_heap_delete(pgdbrel, &tup->t_self);
599
600         systable_endscan(pgdbscan);
601
602         /*
603          * Delete any comments associated with the database
604          *
605          * NOTE: this is probably dead code since any such comments should have
606          * been in that database, not mine.
607          */
608         DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
609
610         /*
611          * Close pg_database, but keep exclusive lock till commit to ensure
612          * that any new backend scanning pg_database will see the tuple dead.
613          */
614         heap_close(pgdbrel, NoLock);
615
616         /*
617          * Drop pages for this database that are in the shared buffer cache.
618          * This is important to ensure that no remaining backend tries to
619          * write out a dirty buffer to the dead database later...
620          */
621         DropBuffers(db_id);
622
623         /*
624          * Also, clean out any entries in the shared free space map.
625          */
626         FreeSpaceMapForgetDatabase(db_id);
627
628         /*
629          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
630          * open files, which would cause rmdir() to fail.
631          */
632 #ifdef WIN32
633         RequestCheckpoint(true);
634 #endif
635
636         /*
637          * Remove all tablespace subdirs belonging to the database.
638          */
639         remove_dbtablespaces(db_id);
640
641         /*
642          * Force dirty buffers out to disk, so that newly-connecting backends
643          * will see the database tuple marked dead in pg_database right away.
644          * (They'll see an uncommitted deletion, but they don't care; see
645          * GetRawDatabaseInfo.)
646          */
647         BufferSync(-1, -1);
648 }
649
650
651 /*
652  * Rename database
653  */
654 void
655 RenameDatabase(const char *oldname, const char *newname)
656 {
657         HeapTuple       tup,
658                                 newtup;
659         Relation        rel;
660         SysScanDesc scan,
661                                 scan2;
662         ScanKeyData key,
663                                 key2;
664
665         /*
666          * Obtain AccessExclusiveLock so that no new session gets started
667          * while the rename is in progress.
668          */
669         rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
670
671         ScanKeyInit(&key,
672                                 Anum_pg_database_datname,
673                                 BTEqualStrategyNumber, F_NAMEEQ,
674                                 NameGetDatum(oldname));
675         scan = systable_beginscan(rel, DatabaseNameIndex, true,
676                                                           SnapshotNow, 1, &key);
677
678         tup = systable_getnext(scan);
679         if (!HeapTupleIsValid(tup))
680                 ereport(ERROR,
681                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
682                                  errmsg("database \"%s\" does not exist", oldname)));
683
684         /*
685          * XXX Client applications probably store the current database
686          * somewhere, so renaming it could cause confusion.  On the other
687          * hand, there may not be an actual problem besides a little
688          * confusion, so think about this and decide.
689          */
690         if (HeapTupleGetOid(tup) == MyDatabaseId)
691                 ereport(ERROR,
692                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
693                                  errmsg("current database may not be renamed")));
694
695         /*
696          * Make sure the database does not have active sessions.  Might not be
697          * necessary, but it's consistent with other database operations.
698          */
699         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
700                 ereport(ERROR,
701                                 (errcode(ERRCODE_OBJECT_IN_USE),
702                            errmsg("database \"%s\" is being accessed by other users",
703                                           oldname)));
704
705         /* make sure the new name doesn't exist */
706         ScanKeyInit(&key2,
707                                 Anum_pg_database_datname,
708                                 BTEqualStrategyNumber, F_NAMEEQ,
709                                 NameGetDatum(newname));
710         scan2 = systable_beginscan(rel, DatabaseNameIndex, true,
711                                                            SnapshotNow, 1, &key2);
712         if (HeapTupleIsValid(systable_getnext(scan2)))
713                 ereport(ERROR,
714                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
715                                  errmsg("database \"%s\" already exists", newname)));
716         systable_endscan(scan2);
717
718         /* must be owner */
719         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
720                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
721                                            oldname);
722
723         /* must have createdb */
724         if (!have_createdb_privilege())
725                 ereport(ERROR,
726                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
727                                  errmsg("permission denied to rename database")));
728
729         /* rename */
730         newtup = heap_copytuple(tup);
731         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
732         simple_heap_update(rel, &newtup->t_self, newtup);
733         CatalogUpdateIndexes(rel, newtup);
734
735         systable_endscan(scan);
736         heap_close(rel, NoLock);
737
738         /*
739          * Force dirty buffers out to disk, so that newly-connecting backends
740          * will see the renamed database in pg_database right away.  (They'll
741          * see an uncommitted tuple, but they don't care; see
742          * GetRawDatabaseInfo.)
743          */
744         BufferSync(-1, -1);
745 }
746
747
748 /*
749  * ALTER DATABASE name SET ...
750  */
751 void
752 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
753 {
754         char       *valuestr;
755         HeapTuple       tuple,
756                                 newtuple;
757         Relation        rel;
758         ScanKeyData scankey;
759         SysScanDesc scan;
760         Datum           repl_val[Natts_pg_database];
761         char            repl_null[Natts_pg_database];
762         char            repl_repl[Natts_pg_database];
763
764         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
765
766         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
767         ScanKeyInit(&scankey,
768                                 Anum_pg_database_datname,
769                                 BTEqualStrategyNumber, F_NAMEEQ,
770                                 NameGetDatum(stmt->dbname));
771         scan = systable_beginscan(rel, DatabaseNameIndex, true,
772                                                           SnapshotNow, 1, &scankey);
773         tuple = systable_getnext(scan);
774         if (!HeapTupleIsValid(tuple))
775                 ereport(ERROR,
776                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
777                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
778
779         if (!(superuser()
780                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
781                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
782                                            stmt->dbname);
783
784         MemSet(repl_repl, ' ', sizeof(repl_repl));
785         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
786
787         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
788         {
789                 /* RESET ALL */
790                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
791                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
792         }
793         else
794         {
795                 Datum           datum;
796                 bool            isnull;
797                 ArrayType  *a;
798
799                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
800
801                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
802                                                          RelationGetDescr(rel), &isnull);
803
804                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
805
806                 if (valuestr)
807                         a = GUCArrayAdd(a, stmt->variable, valuestr);
808                 else
809                         a = GUCArrayDelete(a, stmt->variable);
810
811                 if (a)
812                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
813                 else
814                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
815         }
816
817         newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
818         simple_heap_update(rel, &tuple->t_self, newtuple);
819
820         /* Update indexes */
821         CatalogUpdateIndexes(rel, newtuple);
822
823         systable_endscan(scan);
824         heap_close(rel, NoLock);
825 }
826
827
828 /*
829  * ALTER DATABASE name OWNER TO newowner
830  */
831 void
832 AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
833 {
834         HeapTuple       tuple;
835         Relation        rel;
836         ScanKeyData scankey;
837         SysScanDesc scan;
838         Form_pg_database datForm;
839
840         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
841         ScanKeyInit(&scankey,
842                                 Anum_pg_database_datname,
843                                 BTEqualStrategyNumber, F_NAMEEQ,
844                                 NameGetDatum(dbname));
845         scan = systable_beginscan(rel, DatabaseNameIndex, true,
846                                                           SnapshotNow, 1, &scankey);
847         tuple = systable_getnext(scan);
848         if (!HeapTupleIsValid(tuple))
849                 ereport(ERROR,
850                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
851                                  errmsg("database \"%s\" does not exist", dbname)));
852
853         datForm = (Form_pg_database) GETSTRUCT(tuple);
854
855         /*
856          * If the new owner is the same as the existing owner, consider the
857          * command to have succeeded.  This is to be consistent with other
858          * objects.
859          */
860         if (datForm->datdba != newOwnerSysId)
861         {
862                 Datum           repl_val[Natts_pg_database];
863                 char            repl_null[Natts_pg_database];
864                 char            repl_repl[Natts_pg_database];
865                 Acl                *newAcl;
866                 Datum           aclDatum;
867                 bool            isNull;
868                 HeapTuple       newtuple;
869
870                 /* changing owner's database for someone else: must be superuser */
871                 /* note that the someone else need not have any permissions */
872                 if (!superuser())
873                         ereport(ERROR,
874                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
875                                          errmsg("must be superuser to change owner")));
876
877                 memset(repl_null, ' ', sizeof(repl_null));
878                 memset(repl_repl, ' ', sizeof(repl_repl));
879
880                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
881                 repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
882
883                 /*
884                  * Determine the modified ACL for the new owner.  This is only
885                  * necessary when the ACL is non-null.
886                  */
887                 aclDatum = heap_getattr(tuple,
888                                                                 Anum_pg_database_datacl,
889                                                                 RelationGetDescr(rel),
890                                                                 &isNull);
891                 if (!isNull)
892                 {
893                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
894                                                                  datForm->datdba, newOwnerSysId);
895                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
896                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
897                 }
898
899                 newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
900                 simple_heap_update(rel, &newtuple->t_self, newtuple);
901                 CatalogUpdateIndexes(rel, newtuple);
902
903                 heap_freetuple(newtuple);
904         }
905
906         systable_endscan(scan);
907         heap_close(rel, NoLock);
908 }
909
910
911 /*
912  * Helper functions
913  */
914
915 static bool
916 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
917                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
918                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
919                         Oid *dbTablespace)
920 {
921         Relation        relation;
922         ScanKeyData scanKey;
923         SysScanDesc scan;
924         HeapTuple       tuple;
925         bool            gottuple;
926
927         AssertArg(name);
928
929         /* Caller may wish to grab a better lock on pg_database beforehand... */
930         relation = heap_openr(DatabaseRelationName, AccessShareLock);
931
932         ScanKeyInit(&scanKey,
933                                 Anum_pg_database_datname,
934                                 BTEqualStrategyNumber, F_NAMEEQ,
935                                 NameGetDatum(name));
936
937         scan = systable_beginscan(relation, DatabaseNameIndex, true,
938                                                           SnapshotNow, 1, &scanKey);
939
940         tuple = systable_getnext(scan);
941
942         gottuple = HeapTupleIsValid(tuple);
943         if (gottuple)
944         {
945                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
946
947                 /* oid of the database */
948                 if (dbIdP)
949                         *dbIdP = HeapTupleGetOid(tuple);
950                 /* sysid of the owner */
951                 if (ownerIdP)
952                         *ownerIdP = dbform->datdba;
953                 /* character encoding */
954                 if (encodingP)
955                         *encodingP = dbform->encoding;
956                 /* allowed as template? */
957                 if (dbIsTemplateP)
958                         *dbIsTemplateP = dbform->datistemplate;
959                 /* last system OID used in database */
960                 if (dbLastSysOidP)
961                         *dbLastSysOidP = dbform->datlastsysoid;
962                 /* limit of vacuumed XIDs */
963                 if (dbVacuumXidP)
964                         *dbVacuumXidP = dbform->datvacuumxid;
965                 /* limit of frozen XIDs */
966                 if (dbFrozenXidP)
967                         *dbFrozenXidP = dbform->datfrozenxid;
968                 /* default tablespace for this database */
969                 if (dbTablespace)
970                         *dbTablespace = dbform->dattablespace;
971         }
972
973         systable_endscan(scan);
974         heap_close(relation, AccessShareLock);
975
976         return gottuple;
977 }
978
979 static bool
980 have_createdb_privilege(void)
981 {
982         HeapTuple       utup;
983         bool            retval;
984
985         utup = SearchSysCache(SHADOWSYSID,
986                                                   Int32GetDatum(GetUserId()),
987                                                   0, 0, 0);
988
989         if (!HeapTupleIsValid(utup))
990                 retval = false;
991         else
992                 retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
993
994         ReleaseSysCache(utup);
995
996         return retval;
997 }
998
999 /*
1000  * Remove tablespace directories
1001  *
1002  * We don't know what tablespaces db_id is using, so iterate through all
1003  * tablespaces removing <tablespace>/db_id
1004  */
1005 static void
1006 remove_dbtablespaces(Oid db_id)
1007 {
1008         Relation        rel;
1009         HeapScanDesc scan;
1010         HeapTuple       tuple;
1011
1012         rel = heap_openr(TableSpaceRelationName, AccessShareLock);
1013         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1014         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1015         {
1016                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1017                 char       *dstpath;
1018                 struct stat st;
1019
1020                 /* Don't mess with the global tablespace */
1021                 if (dsttablespace == GLOBALTABLESPACE_OID)
1022                         continue;
1023
1024                 dstpath = GetDatabasePath(db_id, dsttablespace);
1025
1026                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1027                 {
1028                         /* Assume we can ignore it */
1029                         pfree(dstpath);
1030                         continue;
1031                 }
1032
1033                 if (!rmtree(dstpath, true))
1034                         ereport(WARNING,
1035                                         (errmsg("could not remove database directory \"%s\"",
1036                                                         dstpath)));
1037
1038                 /* Record the filesystem change in XLOG */
1039                 {
1040                         xl_dbase_drop_rec xlrec;
1041                         XLogRecData rdata[2];
1042
1043                         xlrec.db_id = db_id;
1044                         rdata[0].buffer = InvalidBuffer;
1045                         rdata[0].data = (char *) &xlrec;
1046                         rdata[0].len = offsetof(xl_dbase_drop_rec, dir_path);
1047                         rdata[0].next = &(rdata[1]);
1048
1049                         rdata[1].buffer = InvalidBuffer;
1050                         rdata[1].data = (char *) dstpath;
1051                         rdata[1].len = strlen(dstpath) + 1;
1052                         rdata[1].next = NULL;
1053
1054                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1055                 }
1056
1057                 pfree(dstpath);
1058         }
1059
1060         heap_endscan(scan);
1061         heap_close(rel, AccessShareLock);
1062 }
1063
1064
1065 /*
1066  * get_database_oid - given a database name, look up the OID
1067  *
1068  * Returns InvalidOid if database name not found.
1069  *
1070  * This is not actually used in this file, but is exported for use elsewhere.
1071  */
1072 Oid
1073 get_database_oid(const char *dbname)
1074 {
1075         Relation        pg_database;
1076         ScanKeyData entry[1];
1077         SysScanDesc scan;
1078         HeapTuple       dbtuple;
1079         Oid                     oid;
1080
1081         /* There's no syscache for pg_database, so must look the hard way */
1082         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1083         ScanKeyInit(&entry[0],
1084                                 Anum_pg_database_datname,
1085                                 BTEqualStrategyNumber, F_NAMEEQ,
1086                                 CStringGetDatum(dbname));
1087         scan = systable_beginscan(pg_database, DatabaseNameIndex, true,
1088                                                           SnapshotNow, 1, entry);
1089
1090         dbtuple = systable_getnext(scan);
1091
1092         /* We assume that there can be at most one matching tuple */
1093         if (HeapTupleIsValid(dbtuple))
1094                 oid = HeapTupleGetOid(dbtuple);
1095         else
1096                 oid = InvalidOid;
1097
1098         systable_endscan(scan);
1099         heap_close(pg_database, AccessShareLock);
1100
1101         return oid;
1102 }
1103
1104
1105 /*
1106  * get_database_name - given a database OID, look up the name
1107  *
1108  * Returns a palloc'd string, or NULL if no such database.
1109  *
1110  * This is not actually used in this file, but is exported for use elsewhere.
1111  */
1112 char *
1113 get_database_name(Oid dbid)
1114 {
1115         Relation        pg_database;
1116         ScanKeyData entry[1];
1117         SysScanDesc scan;
1118         HeapTuple       dbtuple;
1119         char       *result;
1120
1121         /* There's no syscache for pg_database, so must look the hard way */
1122         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
1123         ScanKeyInit(&entry[0],
1124                                 ObjectIdAttributeNumber,
1125                                 BTEqualStrategyNumber, F_OIDEQ,
1126                                 ObjectIdGetDatum(dbid));
1127         scan = systable_beginscan(pg_database, DatabaseOidIndex, true,
1128                                                           SnapshotNow, 1, entry);
1129
1130         dbtuple = systable_getnext(scan);
1131
1132         /* We assume that there can be at most one matching tuple */
1133         if (HeapTupleIsValid(dbtuple))
1134                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1135         else
1136                 result = NULL;
1137
1138         systable_endscan(scan);
1139         heap_close(pg_database, AccessShareLock);
1140
1141         return result;
1142 }
1143
1144 /*
1145  * DATABASE resource manager's routines
1146  */
1147 void
1148 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1149 {
1150         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1151
1152         if (info == XLOG_DBASE_CREATE)
1153         {
1154                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1155                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1156                 struct stat st;
1157
1158 #ifndef WIN32
1159                 char            buf[2 * MAXPGPATH + 100];
1160 #endif
1161
1162                 /*
1163                  * Our theory for replaying a CREATE is to forcibly drop the
1164                  * target subdirectory if present, then re-copy the source data.
1165                  * This may be more work than needed, but it is simple to
1166                  * implement.
1167                  */
1168                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1169                 {
1170                         if (!rmtree(dst_path, true))
1171                                 ereport(WARNING,
1172                                         (errmsg("could not remove database directory \"%s\"",
1173                                                         dst_path)));
1174                 }
1175
1176                 /*
1177                  * Force dirty buffers out to disk, to ensure source database is
1178                  * up-to-date for the copy.  (We really only need to flush buffers for
1179                  * the source database...)
1180                  */
1181                 BufferSync(-1, -1);
1182
1183 #ifndef WIN32
1184
1185                 /*
1186                  * Copy this subdirectory to the new location
1187                  *
1188                  * XXX use of cp really makes this code pretty grotty, particularly
1189                  * with respect to lack of ability to report errors well.  Someday
1190                  * rewrite to do it for ourselves.
1191                  */
1192
1193                 /* We might need to use cp -R one day for portability */
1194                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1195                                  xlrec->src_path, dst_path);
1196                 if (system(buf) != 0)
1197                         ereport(ERROR,
1198                                         (errmsg("could not initialize database directory"),
1199                                          errdetail("Failing system command was: %s", buf),
1200                                          errhint("Look in the postmaster's stderr log for more information.")));
1201 #else                                                   /* WIN32 */
1202                 if (copydir(xlrec->src_path, dst_path) != 0)
1203                 {
1204                         /* copydir should already have given details of its troubles */
1205                         ereport(ERROR,
1206                                         (errmsg("could not initialize database directory")));
1207                 }
1208 #endif   /* WIN32 */
1209         }
1210         else if (info == XLOG_DBASE_DROP)
1211         {
1212                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1213
1214                 /*
1215                  * Drop pages for this database that are in the shared buffer
1216                  * cache
1217                  */
1218                 DropBuffers(xlrec->db_id);
1219
1220                 if (!rmtree(xlrec->dir_path, true))
1221                         ereport(WARNING,
1222                                         (errmsg("could not remove database directory \"%s\"",
1223                                                         xlrec->dir_path)));
1224         }
1225         else
1226                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1227 }
1228
1229 void
1230 dbase_undo(XLogRecPtr lsn, XLogRecord *record)
1231 {
1232         elog(PANIC, "dbase_undo: unimplemented");
1233 }
1234
1235 void
1236 dbase_desc(char *buf, uint8 xl_info, char *rec)
1237 {
1238         uint8           info = xl_info & ~XLR_INFO_MASK;
1239
1240         if (info == XLOG_DBASE_CREATE)
1241         {
1242                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1243                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1244
1245                 sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
1246                                 xlrec->db_id, xlrec->src_path, dst_path);
1247         }
1248         else if (info == XLOG_DBASE_DROP)
1249         {
1250                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1251
1252                 sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
1253                                 xlrec->db_id, xlrec->dir_path);
1254         }
1255         else
1256                 strcat(buf, "UNKNOWN");
1257 }