]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Update copyrights in source tree to 2008.
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes.  Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.204 2008/01/01 19:45:48 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <locale.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
26
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/xact.h"
30 #include "catalog/catalog.h"
31 #include "catalog/dependency.h"
32 #include "catalog/indexing.h"
33 #include "catalog/pg_authid.h"
34 #include "catalog/pg_database.h"
35 #include "catalog/pg_tablespace.h"
36 #include "commands/comment.h"
37 #include "commands/dbcommands.h"
38 #include "commands/tablespace.h"
39 #include "mb/pg_wchar.h"
40 #include "miscadmin.h"
41 #include "pgstat.h"
42 #include "postmaster/bgwriter.h"
43 #include "storage/freespace.h"
44 #include "storage/procarray.h"
45 #include "storage/smgr.h"
46 #include "utils/acl.h"
47 #include "utils/builtins.h"
48 #include "utils/flatfiles.h"
49 #include "utils/fmgroids.h"
50 #include "utils/guc.h"
51 #include "utils/lsyscache.h"
52 #include "utils/syscache.h"
53
54
55 /* non-export function prototypes */
56 static bool get_db_info(const char *name, LOCKMODE lockmode,
57                         Oid *dbIdP, Oid *ownerIdP,
58                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
59                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
60                         Oid *dbTablespace);
61 static bool have_createdb_privilege(void);
62 static void remove_dbtablespaces(Oid db_id);
63 static bool check_db_file_conflict(Oid db_id);
64
65
66 /*
67  * CREATE DATABASE
68  */
69 void
70 createdb(const CreatedbStmt *stmt)
71 {
72         HeapScanDesc scan;
73         Relation        rel;
74         Oid                     src_dboid;
75         Oid                     src_owner;
76         int                     src_encoding;
77         bool            src_istemplate;
78         bool            src_allowconn;
79         Oid                     src_lastsysoid;
80         TransactionId src_frozenxid;
81         Oid                     src_deftablespace;
82         volatile Oid dst_deftablespace;
83         Relation        pg_database_rel;
84         HeapTuple       tuple;
85         Datum           new_record[Natts_pg_database];
86         char            new_record_nulls[Natts_pg_database];
87         Oid                     dboid;
88         Oid                     datdba;
89         ListCell   *option;
90         DefElem    *dtablespacename = NULL;
91         DefElem    *downer = NULL;
92         DefElem    *dtemplate = NULL;
93         DefElem    *dencoding = NULL;
94         DefElem    *dconnlimit = NULL;
95         char       *dbname = stmt->dbname;
96         char       *dbowner = NULL;
97         const char *dbtemplate = NULL;
98         int                     encoding = -1;
99         int                     dbconnlimit = -1;
100         int                     ctype_encoding;
101
102         /* Extract options from the statement node tree */
103         foreach(option, stmt->options)
104         {
105                 DefElem    *defel = (DefElem *) lfirst(option);
106
107                 if (strcmp(defel->defname, "tablespace") == 0)
108                 {
109                         if (dtablespacename)
110                                 ereport(ERROR,
111                                                 (errcode(ERRCODE_SYNTAX_ERROR),
112                                                  errmsg("conflicting or redundant options")));
113                         dtablespacename = defel;
114                 }
115                 else if (strcmp(defel->defname, "owner") == 0)
116                 {
117                         if (downer)
118                                 ereport(ERROR,
119                                                 (errcode(ERRCODE_SYNTAX_ERROR),
120                                                  errmsg("conflicting or redundant options")));
121                         downer = defel;
122                 }
123                 else if (strcmp(defel->defname, "template") == 0)
124                 {
125                         if (dtemplate)
126                                 ereport(ERROR,
127                                                 (errcode(ERRCODE_SYNTAX_ERROR),
128                                                  errmsg("conflicting or redundant options")));
129                         dtemplate = defel;
130                 }
131                 else if (strcmp(defel->defname, "encoding") == 0)
132                 {
133                         if (dencoding)
134                                 ereport(ERROR,
135                                                 (errcode(ERRCODE_SYNTAX_ERROR),
136                                                  errmsg("conflicting or redundant options")));
137                         dencoding = defel;
138                 }
139                 else if (strcmp(defel->defname, "connectionlimit") == 0)
140                 {
141                         if (dconnlimit)
142                                 ereport(ERROR,
143                                                 (errcode(ERRCODE_SYNTAX_ERROR),
144                                                  errmsg("conflicting or redundant options")));
145                         dconnlimit = defel;
146                 }
147                 else if (strcmp(defel->defname, "location") == 0)
148                 {
149                         ereport(WARNING,
150                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
151                                          errmsg("LOCATION is not supported anymore"),
152                                          errhint("Consider using tablespaces instead.")));
153                 }
154                 else
155                         elog(ERROR, "option \"%s\" not recognized",
156                                  defel->defname);
157         }
158
159         if (downer && downer->arg)
160                 dbowner = strVal(downer->arg);
161         if (dtemplate && dtemplate->arg)
162                 dbtemplate = strVal(dtemplate->arg);
163         if (dencoding && dencoding->arg)
164         {
165                 const char *encoding_name;
166
167                 if (IsA(dencoding->arg, Integer))
168                 {
169                         encoding = intVal(dencoding->arg);
170                         encoding_name = pg_encoding_to_char(encoding);
171                         if (strcmp(encoding_name, "") == 0 ||
172                                 pg_valid_server_encoding(encoding_name) < 0)
173                                 ereport(ERROR,
174                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
175                                                  errmsg("%d is not a valid encoding code",
176                                                                 encoding)));
177                 }
178                 else if (IsA(dencoding->arg, String))
179                 {
180                         encoding_name = strVal(dencoding->arg);
181                         encoding = pg_valid_server_encoding(encoding_name);
182                         if (encoding < 0)
183                                 ereport(ERROR,
184                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
185                                                  errmsg("%s is not a valid encoding name",
186                                                                 encoding_name)));
187                 }
188                 else
189                         elog(ERROR, "unrecognized node type: %d",
190                                  nodeTag(dencoding->arg));
191         }
192         if (dconnlimit && dconnlimit->arg)
193                 dbconnlimit = intVal(dconnlimit->arg);
194
195         /* obtain OID of proposed owner */
196         if (dbowner)
197                 datdba = get_roleid_checked(dbowner);
198         else
199                 datdba = GetUserId();
200
201         /*
202          * To create a database, must have createdb privilege and must be able to
203          * become the target role (this does not imply that the target role itself
204          * must have createdb privilege).  The latter provision guards against
205          * "giveaway" attacks.  Note that a superuser will always have both of
206          * these privileges a fortiori.
207          */
208         if (!have_createdb_privilege())
209                 ereport(ERROR,
210                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
211                                  errmsg("permission denied to create database")));
212
213         check_is_member_of_role(GetUserId(), datdba);
214
215         /*
216          * Lookup database (template) to be cloned, and obtain share lock on it.
217          * ShareLock allows two CREATE DATABASEs to work from the same template
218          * concurrently, while ensuring no one is busy dropping it in parallel
219          * (which would be Very Bad since we'd likely get an incomplete copy
220          * without knowing it).  This also prevents any new connections from being
221          * made to the source until we finish copying it, so we can be sure it
222          * won't change underneath us.
223          */
224         if (!dbtemplate)
225                 dbtemplate = "template1";               /* Default template database name */
226
227         if (!get_db_info(dbtemplate, ShareLock,
228                                          &src_dboid, &src_owner, &src_encoding,
229                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
230                                          &src_frozenxid, &src_deftablespace))
231                 ereport(ERROR,
232                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
233                                  errmsg("template database \"%s\" does not exist",
234                                                 dbtemplate)));
235
236         /*
237          * Permission check: to copy a DB that's not marked datistemplate, you
238          * must be superuser or the owner thereof.
239          */
240         if (!src_istemplate)
241         {
242                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
243                         ereport(ERROR,
244                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
245                                          errmsg("permission denied to copy database \"%s\"",
246                                                         dbtemplate)));
247         }
248
249         /* If encoding is defaulted, use source's encoding */
250         if (encoding < 0)
251                 encoding = src_encoding;
252
253         /* Some encodings are client only */
254         if (!PG_VALID_BE_ENCODING(encoding))
255                 ereport(ERROR,
256                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
257                                  errmsg("invalid server encoding %d", encoding)));
258
259         /*
260          * Check whether encoding matches server locale settings.  We allow
261          * mismatch in three cases:
262          *
263          * 1. ctype_encoding = SQL_ASCII, which means either that the locale is
264          * C/POSIX which works with any encoding, or that we couldn't determine
265          * the locale's encoding and have to trust the user to get it right.
266          *
267          * 2. selected encoding is SQL_ASCII, but only if you're a superuser. This
268          * is risky but we have historically allowed it --- notably, the
269          * regression tests require it.
270          *
271          * 3. selected encoding is UTF8 and platform is win32. This is because
272          * UTF8 is a pseudo codepage that is supported in all locales since it's
273          * converted to UTF16 before being used.
274          *
275          * Note: if you change this policy, fix initdb to match.
276          */
277         ctype_encoding = pg_get_encoding_from_locale(NULL);
278
279         if (!(ctype_encoding == encoding ||
280                   ctype_encoding == PG_SQL_ASCII ||
281 #ifdef WIN32
282                   encoding == PG_UTF8 ||
283 #endif
284                   (encoding == PG_SQL_ASCII && superuser())))
285                 ereport(ERROR,
286                                 (errmsg("encoding %s does not match server's locale %s",
287                                                 pg_encoding_to_char(encoding),
288                                                 setlocale(LC_CTYPE, NULL)),
289                          errdetail("The server's LC_CTYPE setting requires encoding %s.",
290                                            pg_encoding_to_char(ctype_encoding))));
291
292         /* Resolve default tablespace for new database */
293         if (dtablespacename && dtablespacename->arg)
294         {
295                 char       *tablespacename;
296                 AclResult       aclresult;
297
298                 tablespacename = strVal(dtablespacename->arg);
299                 dst_deftablespace = get_tablespace_oid(tablespacename);
300                 if (!OidIsValid(dst_deftablespace))
301                         ereport(ERROR,
302                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
303                                          errmsg("tablespace \"%s\" does not exist",
304                                                         tablespacename)));
305                 /* check permissions */
306                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
307                                                                                    ACL_CREATE);
308                 if (aclresult != ACLCHECK_OK)
309                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
310                                                    tablespacename);
311
312                 /* pg_global must never be the default tablespace */
313                 if (dst_deftablespace == GLOBALTABLESPACE_OID)
314                         ereport(ERROR,
315                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
316                                   errmsg("pg_global cannot be used as default tablespace")));
317
318                 /*
319                  * If we are trying to change the default tablespace of the template,
320                  * we require that the template not have any files in the new default
321                  * tablespace.  This is necessary because otherwise the copied
322                  * database would contain pg_class rows that refer to its default
323                  * tablespace both explicitly (by OID) and implicitly (as zero), which
324                  * would cause problems.  For example another CREATE DATABASE using
325                  * the copied database as template, and trying to change its default
326                  * tablespace again, would yield outright incorrect results (it would
327                  * improperly move tables to the new default tablespace that should
328                  * stay in the same tablespace).
329                  */
330                 if (dst_deftablespace != src_deftablespace)
331                 {
332                         char       *srcpath;
333                         struct stat st;
334
335                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
336
337                         if (stat(srcpath, &st) == 0 &&
338                                 S_ISDIR(st.st_mode) &&
339                                 !directory_is_empty(srcpath))
340                                 ereport(ERROR,
341                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
342                                                  errmsg("cannot assign new default tablespace \"%s\"",
343                                                                 tablespacename),
344                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
345                                                                    dbtemplate)));
346                         pfree(srcpath);
347                 }
348         }
349         else
350         {
351                 /* Use template database's default tablespace */
352                 dst_deftablespace = src_deftablespace;
353                 /* Note there is no additional permission check in this path */
354         }
355
356         /*
357          * Check for db name conflict.  This is just to give a more friendly error
358          * message than "unique index violation".  There's a race condition but
359          * we're willing to accept the less friendly message in that case.
360          */
361         if (OidIsValid(get_database_oid(dbname)))
362                 ereport(ERROR,
363                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
364                                  errmsg("database \"%s\" already exists", dbname)));
365
366         /*
367          * The source DB can't have any active backends, except this one
368          * (exception is to allow CREATE DB while connected to template1).
369          * Otherwise we might copy inconsistent data.
370          *
371          * This should be last among the basic error checks, because it involves
372          * potential waiting; we may as well throw an error first if we're gonna
373          * throw one.
374          */
375         if (CheckOtherDBBackends(src_dboid))
376                 ereport(ERROR,
377                                 (errcode(ERRCODE_OBJECT_IN_USE),
378                         errmsg("source database \"%s\" is being accessed by other users",
379                                    dbtemplate)));
380
381         /*
382          * Select an OID for the new database, checking that it doesn't have a
383          * filename conflict with anything already existing in the tablespace
384          * directories.
385          */
386         pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
387
388         do
389         {
390                 dboid = GetNewOid(pg_database_rel);
391         } while (check_db_file_conflict(dboid));
392
393         /*
394          * Insert a new tuple into pg_database.  This establishes our ownership of
395          * the new database name (anyone else trying to insert the same name will
396          * block on the unique index, and fail after we commit).
397          */
398
399         /* Form tuple */
400         MemSet(new_record, 0, sizeof(new_record));
401         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
402
403         new_record[Anum_pg_database_datname - 1] =
404                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
405         new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
406         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
407         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
408         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
409         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
410         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
411         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
412         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
413
414         /*
415          * We deliberately set datconfig and datacl to defaults (NULL), rather
416          * than copying them from the template database.  Copying datacl would be
417          * a bad idea when the owner is not the same as the template's owner. It's
418          * more debatable whether datconfig should be copied.
419          */
420         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
421         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
422
423         tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
424                                                    new_record, new_record_nulls);
425
426         HeapTupleSetOid(tuple, dboid);
427
428         simple_heap_insert(pg_database_rel, tuple);
429
430         /* Update indexes */
431         CatalogUpdateIndexes(pg_database_rel, tuple);
432
433         /*
434          * Now generate additional catalog entries associated with the new DB
435          */
436
437         /* Register owner dependency */
438         recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
439
440         /* Create pg_shdepend entries for objects within database */
441         copyTemplateDependencies(src_dboid, dboid);
442
443         /*
444          * Force dirty buffers out to disk, to ensure source database is
445          * up-to-date for the copy.
446          */
447         FlushDatabaseBuffers(src_dboid);
448
449         /*
450          * Once we start copying subdirectories, we need to be able to clean 'em
451          * up if we fail.  Establish a TRY block to make sure this happens. (This
452          * is not a 100% solution, because of the possibility of failure during
453          * transaction commit after we leave this routine, but it should handle
454          * most scenarios.)
455          */
456         PG_TRY();
457         {
458                 /*
459                  * Iterate through all tablespaces of the template database, and copy
460                  * each one to the new database.
461                  */
462                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
463                 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
464                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
465                 {
466                         Oid                     srctablespace = HeapTupleGetOid(tuple);
467                         Oid                     dsttablespace;
468                         char       *srcpath;
469                         char       *dstpath;
470                         struct stat st;
471
472                         /* No need to copy global tablespace */
473                         if (srctablespace == GLOBALTABLESPACE_OID)
474                                 continue;
475
476                         srcpath = GetDatabasePath(src_dboid, srctablespace);
477
478                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
479                                 directory_is_empty(srcpath))
480                         {
481                                 /* Assume we can ignore it */
482                                 pfree(srcpath);
483                                 continue;
484                         }
485
486                         if (srctablespace == src_deftablespace)
487                                 dsttablespace = dst_deftablespace;
488                         else
489                                 dsttablespace = srctablespace;
490
491                         dstpath = GetDatabasePath(dboid, dsttablespace);
492
493                         /*
494                          * Copy this subdirectory to the new location
495                          *
496                          * We don't need to copy subdirectories
497                          */
498                         copydir(srcpath, dstpath, false);
499
500                         /* Record the filesystem change in XLOG */
501                         {
502                                 xl_dbase_create_rec xlrec;
503                                 XLogRecData rdata[1];
504
505                                 xlrec.db_id = dboid;
506                                 xlrec.tablespace_id = dsttablespace;
507                                 xlrec.src_db_id = src_dboid;
508                                 xlrec.src_tablespace_id = srctablespace;
509
510                                 rdata[0].data = (char *) &xlrec;
511                                 rdata[0].len = sizeof(xl_dbase_create_rec);
512                                 rdata[0].buffer = InvalidBuffer;
513                                 rdata[0].next = NULL;
514
515                                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
516                         }
517                 }
518                 heap_endscan(scan);
519                 heap_close(rel, AccessShareLock);
520
521                 /*
522                  * We force a checkpoint before committing.  This effectively means
523                  * that committed XLOG_DBASE_CREATE operations will never need to be
524                  * replayed (at least not in ordinary crash recovery; we still have to
525                  * make the XLOG entry for the benefit of PITR operations). This
526                  * avoids two nasty scenarios:
527                  *
528                  * #1: When PITR is off, we don't XLOG the contents of newly created
529                  * indexes; therefore the drop-and-recreate-whole-directory behavior
530                  * of DBASE_CREATE replay would lose such indexes.
531                  *
532                  * #2: Since we have to recopy the source database during DBASE_CREATE
533                  * replay, we run the risk of copying changes in it that were
534                  * committed after the original CREATE DATABASE command but before the
535                  * system crash that led to the replay.  This is at least unexpected
536                  * and at worst could lead to inconsistencies, eg duplicate table
537                  * names.
538                  *
539                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
540                  *
541                  * In PITR replay, the first of these isn't an issue, and the second
542                  * is only a risk if the CREATE DATABASE and subsequent template
543                  * database change both occur while a base backup is being taken.
544                  * There doesn't seem to be much we can do about that except document
545                  * it as a limitation.
546                  *
547                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
548                  * we can avoid this.
549                  */
550                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
551
552                 /*
553                  * Close pg_database, but keep lock till commit (this is important to
554                  * prevent any risk of deadlock failure while updating flat file)
555                  */
556                 heap_close(pg_database_rel, NoLock);
557
558                 /*
559                  * Set flag to update flat database file at commit.  Note: this also
560                  * forces synchronous commit, which minimizes the window between
561                  * creation of the database files and commital of the transaction. If
562                  * we crash before committing, we'll have a DB that's taking up disk
563                  * space but is not in pg_database, which is not good.
564                  */
565                 database_file_update_needed();
566         }
567         PG_CATCH();
568         {
569                 /* Release lock on source database before doing recursive remove */
570                 UnlockSharedObject(DatabaseRelationId, src_dboid, 0,
571                                                    ShareLock);
572
573                 /* Throw away any successfully copied subdirectories */
574                 remove_dbtablespaces(dboid);
575
576                 PG_RE_THROW();
577         }
578         PG_END_TRY();
579 }
580
581
582 /*
583  * DROP DATABASE
584  */
585 void
586 dropdb(const char *dbname, bool missing_ok)
587 {
588         Oid                     db_id;
589         bool            db_istemplate;
590         Relation        pgdbrel;
591         HeapTuple       tup;
592
593         /*
594          * Look up the target database's OID, and get exclusive lock on it. We
595          * need this to ensure that no new backend starts up in the target
596          * database while we are deleting it (see postinit.c), and that no one is
597          * using it as a CREATE DATABASE template or trying to delete it for
598          * themselves.
599          */
600         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
601
602         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
603                                          &db_istemplate, NULL, NULL, NULL, NULL))
604         {
605                 if (!missing_ok)
606                 {
607                         ereport(ERROR,
608                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
609                                          errmsg("database \"%s\" does not exist", dbname)));
610                 }
611                 else
612                 {
613                         /* Close pg_database, release the lock, since we changed nothing */
614                         heap_close(pgdbrel, RowExclusiveLock);
615                         ereport(NOTICE,
616                                         (errmsg("database \"%s\" does not exist, skipping",
617                                                         dbname)));
618                         return;
619                 }
620         }
621
622         /*
623          * Permission checks
624          */
625         if (!pg_database_ownercheck(db_id, GetUserId()))
626                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
627                                            dbname);
628
629         /*
630          * Disallow dropping a DB that is marked istemplate.  This is just to
631          * prevent people from accidentally dropping template0 or template1; they
632          * can do so if they're really determined ...
633          */
634         if (db_istemplate)
635                 ereport(ERROR,
636                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
637                                  errmsg("cannot drop a template database")));
638
639         /* Obviously can't drop my own database */
640         if (db_id == MyDatabaseId)
641                 ereport(ERROR,
642                                 (errcode(ERRCODE_OBJECT_IN_USE),
643                                  errmsg("cannot drop the currently open database")));
644
645         /*
646          * Check for other backends in the target database.  (Because we hold the
647          * database lock, no new ones can start after this.)
648          *
649          * As in CREATE DATABASE, check this after other error conditions.
650          */
651         if (CheckOtherDBBackends(db_id))
652                 ereport(ERROR,
653                                 (errcode(ERRCODE_OBJECT_IN_USE),
654                                  errmsg("database \"%s\" is being accessed by other users",
655                                                 dbname)));
656
657         /*
658          * Remove the database's tuple from pg_database.
659          */
660         tup = SearchSysCache(DATABASEOID,
661                                                  ObjectIdGetDatum(db_id),
662                                                  0, 0, 0);
663         if (!HeapTupleIsValid(tup))
664                 elog(ERROR, "cache lookup failed for database %u", db_id);
665
666         simple_heap_delete(pgdbrel, &tup->t_self);
667
668         ReleaseSysCache(tup);
669
670         /*
671          * Delete any comments associated with the database.
672          */
673         DeleteSharedComments(db_id, DatabaseRelationId);
674
675         /*
676          * Remove shared dependency references for the database.
677          */
678         dropDatabaseDependencies(db_id);
679
680         /*
681          * Drop pages for this database that are in the shared buffer cache. This
682          * is important to ensure that no remaining backend tries to write out a
683          * dirty buffer to the dead database later...
684          */
685         DropDatabaseBuffers(db_id);
686
687         /*
688          * Also, clean out any entries in the shared free space map.
689          */
690         FreeSpaceMapForgetDatabase(db_id);
691
692         /*
693          * Tell the stats collector to forget it immediately, too.
694          */
695         pgstat_drop_database(db_id);
696
697         /*
698          * Tell bgwriter to forget any pending fsync requests for files in the
699          * database; else it'll fail at next checkpoint.
700          */
701         ForgetDatabaseFsyncRequests(db_id);
702
703         /*
704          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
705          * open files, which would cause rmdir() to fail.
706          */
707 #ifdef WIN32
708         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
709 #endif
710
711         /*
712          * Remove all tablespace subdirs belonging to the database.
713          */
714         remove_dbtablespaces(db_id);
715
716         /*
717          * Close pg_database, but keep lock till commit (this is important to
718          * prevent any risk of deadlock failure while updating flat file)
719          */
720         heap_close(pgdbrel, NoLock);
721
722         /*
723          * Set flag to update flat database file at commit.  Note: this also
724          * forces synchronous commit, which minimizes the window between removal
725          * of the database files and commital of the transaction. If we crash
726          * before committing, we'll have a DB that's gone on disk but still there
727          * according to pg_database, which is not good.
728          */
729         database_file_update_needed();
730 }
731
732
733 /*
734  * Rename database
735  */
736 void
737 RenameDatabase(const char *oldname, const char *newname)
738 {
739         Oid                     db_id;
740         HeapTuple       newtup;
741         Relation        rel;
742
743         /*
744          * Look up the target database's OID, and get exclusive lock on it. We
745          * need this for the same reasons as DROP DATABASE.
746          */
747         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
748
749         if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
750                                          NULL, NULL, NULL, NULL, NULL))
751                 ereport(ERROR,
752                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
753                                  errmsg("database \"%s\" does not exist", oldname)));
754
755         /* must be owner */
756         if (!pg_database_ownercheck(db_id, GetUserId()))
757                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
758                                            oldname);
759
760         /* must have createdb rights */
761         if (!have_createdb_privilege())
762                 ereport(ERROR,
763                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
764                                  errmsg("permission denied to rename database")));
765
766         /*
767          * Make sure the new name doesn't exist.  See notes for same error in
768          * CREATE DATABASE.
769          */
770         if (OidIsValid(get_database_oid(newname)))
771                 ereport(ERROR,
772                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
773                                  errmsg("database \"%s\" already exists", newname)));
774
775         /*
776          * XXX Client applications probably store the current database somewhere,
777          * so renaming it could cause confusion.  On the other hand, there may not
778          * be an actual problem besides a little confusion, so think about this
779          * and decide.
780          */
781         if (db_id == MyDatabaseId)
782                 ereport(ERROR,
783                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
784                                  errmsg("current database cannot be renamed")));
785
786         /*
787          * Make sure the database does not have active sessions.  This is the same
788          * concern as above, but applied to other sessions.
789          *
790          * As in CREATE DATABASE, check this after other error conditions.
791          */
792         if (CheckOtherDBBackends(db_id))
793                 ereport(ERROR,
794                                 (errcode(ERRCODE_OBJECT_IN_USE),
795                                  errmsg("database \"%s\" is being accessed by other users",
796                                                 oldname)));
797
798         /* rename */
799         newtup = SearchSysCacheCopy(DATABASEOID,
800                                                                 ObjectIdGetDatum(db_id),
801                                                                 0, 0, 0);
802         if (!HeapTupleIsValid(newtup))
803                 elog(ERROR, "cache lookup failed for database %u", db_id);
804         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
805         simple_heap_update(rel, &newtup->t_self, newtup);
806         CatalogUpdateIndexes(rel, newtup);
807
808         /*
809          * Close pg_database, but keep lock till commit (this is important to
810          * prevent any risk of deadlock failure while updating flat file)
811          */
812         heap_close(rel, NoLock);
813
814         /*
815          * Set flag to update flat database file at commit.
816          */
817         database_file_update_needed();
818 }
819
820
821 /*
822  * ALTER DATABASE name ...
823  */
824 void
825 AlterDatabase(AlterDatabaseStmt *stmt)
826 {
827         Relation        rel;
828         HeapTuple       tuple,
829                                 newtuple;
830         ScanKeyData scankey;
831         SysScanDesc scan;
832         ListCell   *option;
833         int                     connlimit = -1;
834         DefElem    *dconnlimit = NULL;
835         Datum           new_record[Natts_pg_database];
836         char            new_record_nulls[Natts_pg_database];
837         char            new_record_repl[Natts_pg_database];
838
839         /* Extract options from the statement node tree */
840         foreach(option, stmt->options)
841         {
842                 DefElem    *defel = (DefElem *) lfirst(option);
843
844                 if (strcmp(defel->defname, "connectionlimit") == 0)
845                 {
846                         if (dconnlimit)
847                                 ereport(ERROR,
848                                                 (errcode(ERRCODE_SYNTAX_ERROR),
849                                                  errmsg("conflicting or redundant options")));
850                         dconnlimit = defel;
851                 }
852                 else
853                         elog(ERROR, "option \"%s\" not recognized",
854                                  defel->defname);
855         }
856
857         if (dconnlimit)
858                 connlimit = intVal(dconnlimit->arg);
859
860         /*
861          * Get the old tuple.  We don't need a lock on the database per se,
862          * because we're not going to do anything that would mess up incoming
863          * connections.
864          */
865         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
866         ScanKeyInit(&scankey,
867                                 Anum_pg_database_datname,
868                                 BTEqualStrategyNumber, F_NAMEEQ,
869                                 NameGetDatum(stmt->dbname));
870         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
871                                                           SnapshotNow, 1, &scankey);
872         tuple = systable_getnext(scan);
873         if (!HeapTupleIsValid(tuple))
874                 ereport(ERROR,
875                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
876                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
877
878         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
879                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
880                                            stmt->dbname);
881
882         /*
883          * Build an updated tuple, perusing the information just obtained
884          */
885         MemSet(new_record, 0, sizeof(new_record));
886         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
887         MemSet(new_record_repl, ' ', sizeof(new_record_repl));
888
889         if (dconnlimit)
890         {
891                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
892                 new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
893         }
894
895         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
896                                                                 new_record_nulls, new_record_repl);
897         simple_heap_update(rel, &tuple->t_self, newtuple);
898
899         /* Update indexes */
900         CatalogUpdateIndexes(rel, newtuple);
901
902         systable_endscan(scan);
903
904         /* Close pg_database, but keep lock till commit */
905         heap_close(rel, NoLock);
906
907         /*
908          * We don't bother updating the flat file since the existing options for
909          * ALTER DATABASE don't affect it.
910          */
911 }
912
913
914 /*
915  * ALTER DATABASE name SET ...
916  */
917 void
918 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
919 {
920         char       *valuestr;
921         HeapTuple       tuple,
922                                 newtuple;
923         Relation        rel;
924         ScanKeyData scankey;
925         SysScanDesc scan;
926         Datum           repl_val[Natts_pg_database];
927         char            repl_null[Natts_pg_database];
928         char            repl_repl[Natts_pg_database];
929
930         valuestr = ExtractSetVariableArgs(stmt->setstmt);
931
932         /*
933          * Get the old tuple.  We don't need a lock on the database per se,
934          * because we're not going to do anything that would mess up incoming
935          * connections.
936          */
937         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
938         ScanKeyInit(&scankey,
939                                 Anum_pg_database_datname,
940                                 BTEqualStrategyNumber, F_NAMEEQ,
941                                 NameGetDatum(stmt->dbname));
942         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
943                                                           SnapshotNow, 1, &scankey);
944         tuple = systable_getnext(scan);
945         if (!HeapTupleIsValid(tuple))
946                 ereport(ERROR,
947                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
948                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
949
950         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
951                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
952                                            stmt->dbname);
953
954         memset(repl_repl, ' ', sizeof(repl_repl));
955         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
956
957         if (stmt->setstmt->kind == VAR_RESET_ALL)
958         {
959                 /* RESET ALL, so just set datconfig to null */
960                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
961                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
962         }
963         else
964         {
965                 Datum           datum;
966                 bool            isnull;
967                 ArrayType  *a;
968
969                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
970
971                 /* Extract old value of datconfig */
972                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
973                                                          RelationGetDescr(rel), &isnull);
974                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
975
976                 /* Update (valuestr is NULL in RESET cases) */
977                 if (valuestr)
978                         a = GUCArrayAdd(a, stmt->setstmt->name, valuestr);
979                 else
980                         a = GUCArrayDelete(a, stmt->setstmt->name);
981
982                 if (a)
983                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
984                 else
985                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
986         }
987
988         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel),
989                                                                 repl_val, repl_null, repl_repl);
990         simple_heap_update(rel, &tuple->t_self, newtuple);
991
992         /* Update indexes */
993         CatalogUpdateIndexes(rel, newtuple);
994
995         systable_endscan(scan);
996
997         /* Close pg_database, but keep lock till commit */
998         heap_close(rel, NoLock);
999
1000         /*
1001          * We don't bother updating the flat file since ALTER DATABASE SET doesn't
1002          * affect it.
1003          */
1004 }
1005
1006
1007 /*
1008  * ALTER DATABASE name OWNER TO newowner
1009  */
1010 void
1011 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1012 {
1013         HeapTuple       tuple;
1014         Relation        rel;
1015         ScanKeyData scankey;
1016         SysScanDesc scan;
1017         Form_pg_database datForm;
1018
1019         /*
1020          * Get the old tuple.  We don't need a lock on the database per se,
1021          * because we're not going to do anything that would mess up incoming
1022          * connections.
1023          */
1024         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1025         ScanKeyInit(&scankey,
1026                                 Anum_pg_database_datname,
1027                                 BTEqualStrategyNumber, F_NAMEEQ,
1028                                 NameGetDatum(dbname));
1029         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1030                                                           SnapshotNow, 1, &scankey);
1031         tuple = systable_getnext(scan);
1032         if (!HeapTupleIsValid(tuple))
1033                 ereport(ERROR,
1034                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1035                                  errmsg("database \"%s\" does not exist", dbname)));
1036
1037         datForm = (Form_pg_database) GETSTRUCT(tuple);
1038
1039         /*
1040          * If the new owner is the same as the existing owner, consider the
1041          * command to have succeeded.  This is to be consistent with other
1042          * objects.
1043          */
1044         if (datForm->datdba != newOwnerId)
1045         {
1046                 Datum           repl_val[Natts_pg_database];
1047                 char            repl_null[Natts_pg_database];
1048                 char            repl_repl[Natts_pg_database];
1049                 Acl                *newAcl;
1050                 Datum           aclDatum;
1051                 bool            isNull;
1052                 HeapTuple       newtuple;
1053
1054                 /* Otherwise, must be owner of the existing object */
1055                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1056                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1057                                                    dbname);
1058
1059                 /* Must be able to become new owner */
1060                 check_is_member_of_role(GetUserId(), newOwnerId);
1061
1062                 /*
1063                  * must have createdb rights
1064                  *
1065                  * NOTE: This is different from other alter-owner checks in that the
1066                  * current user is checked for createdb privileges instead of the
1067                  * destination owner.  This is consistent with the CREATE case for
1068                  * databases.  Because superusers will always have this right, we need
1069                  * no special case for them.
1070                  */
1071                 if (!have_createdb_privilege())
1072                         ereport(ERROR,
1073                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1074                                    errmsg("permission denied to change owner of database")));
1075
1076                 memset(repl_null, ' ', sizeof(repl_null));
1077                 memset(repl_repl, ' ', sizeof(repl_repl));
1078
1079                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
1080                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1081
1082                 /*
1083                  * Determine the modified ACL for the new owner.  This is only
1084                  * necessary when the ACL is non-null.
1085                  */
1086                 aclDatum = heap_getattr(tuple,
1087                                                                 Anum_pg_database_datacl,
1088                                                                 RelationGetDescr(rel),
1089                                                                 &isNull);
1090                 if (!isNull)
1091                 {
1092                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1093                                                                  datForm->datdba, newOwnerId);
1094                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
1095                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1096                 }
1097
1098                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1099                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1100                 CatalogUpdateIndexes(rel, newtuple);
1101
1102                 heap_freetuple(newtuple);
1103
1104                 /* Update owner dependency reference */
1105                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1106                                                                 newOwnerId);
1107         }
1108
1109         systable_endscan(scan);
1110
1111         /* Close pg_database, but keep lock till commit */
1112         heap_close(rel, NoLock);
1113
1114         /*
1115          * We don't bother updating the flat file since ALTER DATABASE OWNER
1116          * doesn't affect it.
1117          */
1118 }
1119
1120
1121 /*
1122  * Helper functions
1123  */
1124
1125 /*
1126  * Look up info about the database named "name".  If the database exists,
1127  * obtain the specified lock type on it, fill in any of the remaining
1128  * parameters that aren't NULL, and return TRUE.  If no such database,
1129  * return FALSE.
1130  */
1131 static bool
1132 get_db_info(const char *name, LOCKMODE lockmode,
1133                         Oid *dbIdP, Oid *ownerIdP,
1134                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1135                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1136                         Oid *dbTablespace)
1137 {
1138         bool            result = false;
1139         Relation        relation;
1140
1141         AssertArg(name);
1142
1143         /* Caller may wish to grab a better lock on pg_database beforehand... */
1144         relation = heap_open(DatabaseRelationId, AccessShareLock);
1145
1146         /*
1147          * Loop covers the rare case where the database is renamed before we can
1148          * lock it.  We try again just in case we can find a new one of the same
1149          * name.
1150          */
1151         for (;;)
1152         {
1153                 ScanKeyData scanKey;
1154                 SysScanDesc scan;
1155                 HeapTuple       tuple;
1156                 Oid                     dbOid;
1157
1158                 /*
1159                  * there's no syscache for database-indexed-by-name, so must do it the
1160                  * hard way
1161                  */
1162                 ScanKeyInit(&scanKey,
1163                                         Anum_pg_database_datname,
1164                                         BTEqualStrategyNumber, F_NAMEEQ,
1165                                         NameGetDatum(name));
1166
1167                 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1168                                                                   SnapshotNow, 1, &scanKey);
1169
1170                 tuple = systable_getnext(scan);
1171
1172                 if (!HeapTupleIsValid(tuple))
1173                 {
1174                         /* definitely no database of that name */
1175                         systable_endscan(scan);
1176                         break;
1177                 }
1178
1179                 dbOid = HeapTupleGetOid(tuple);
1180
1181                 systable_endscan(scan);
1182
1183                 /*
1184                  * Now that we have a database OID, we can try to lock the DB.
1185                  */
1186                 if (lockmode != NoLock)
1187                         LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1188
1189                 /*
1190                  * And now, re-fetch the tuple by OID.  If it's still there and still
1191                  * the same name, we win; else, drop the lock and loop back to try
1192                  * again.
1193                  */
1194                 tuple = SearchSysCache(DATABASEOID,
1195                                                            ObjectIdGetDatum(dbOid),
1196                                                            0, 0, 0);
1197                 if (HeapTupleIsValid(tuple))
1198                 {
1199                         Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1200
1201                         if (strcmp(name, NameStr(dbform->datname)) == 0)
1202                         {
1203                                 /* oid of the database */
1204                                 if (dbIdP)
1205                                         *dbIdP = dbOid;
1206                                 /* oid of the owner */
1207                                 if (ownerIdP)
1208                                         *ownerIdP = dbform->datdba;
1209                                 /* character encoding */
1210                                 if (encodingP)
1211                                         *encodingP = dbform->encoding;
1212                                 /* allowed as template? */
1213                                 if (dbIsTemplateP)
1214                                         *dbIsTemplateP = dbform->datistemplate;
1215                                 /* allowing connections? */
1216                                 if (dbAllowConnP)
1217                                         *dbAllowConnP = dbform->datallowconn;
1218                                 /* last system OID used in database */
1219                                 if (dbLastSysOidP)
1220                                         *dbLastSysOidP = dbform->datlastsysoid;
1221                                 /* limit of frozen XIDs */
1222                                 if (dbFrozenXidP)
1223                                         *dbFrozenXidP = dbform->datfrozenxid;
1224                                 /* default tablespace for this database */
1225                                 if (dbTablespace)
1226                                         *dbTablespace = dbform->dattablespace;
1227                                 ReleaseSysCache(tuple);
1228                                 result = true;
1229                                 break;
1230                         }
1231                         /* can only get here if it was just renamed */
1232                         ReleaseSysCache(tuple);
1233                 }
1234
1235                 if (lockmode != NoLock)
1236                         UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1237         }
1238
1239         heap_close(relation, AccessShareLock);
1240
1241         return result;
1242 }
1243
1244 /* Check if current user has createdb privileges */
1245 static bool
1246 have_createdb_privilege(void)
1247 {
1248         bool            result = false;
1249         HeapTuple       utup;
1250
1251         /* Superusers can always do everything */
1252         if (superuser())
1253                 return true;
1254
1255         utup = SearchSysCache(AUTHOID,
1256                                                   ObjectIdGetDatum(GetUserId()),
1257                                                   0, 0, 0);
1258         if (HeapTupleIsValid(utup))
1259         {
1260                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1261                 ReleaseSysCache(utup);
1262         }
1263         return result;
1264 }
1265
1266 /*
1267  * Remove tablespace directories
1268  *
1269  * We don't know what tablespaces db_id is using, so iterate through all
1270  * tablespaces removing <tablespace>/db_id
1271  */
1272 static void
1273 remove_dbtablespaces(Oid db_id)
1274 {
1275         Relation        rel;
1276         HeapScanDesc scan;
1277         HeapTuple       tuple;
1278
1279         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1280         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1281         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1282         {
1283                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1284                 char       *dstpath;
1285                 struct stat st;
1286
1287                 /* Don't mess with the global tablespace */
1288                 if (dsttablespace == GLOBALTABLESPACE_OID)
1289                         continue;
1290
1291                 dstpath = GetDatabasePath(db_id, dsttablespace);
1292
1293                 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1294                 {
1295                         /* Assume we can ignore it */
1296                         pfree(dstpath);
1297                         continue;
1298                 }
1299
1300                 if (!rmtree(dstpath, true))
1301                         ereport(WARNING,
1302                                         (errmsg("could not remove database directory \"%s\"",
1303                                                         dstpath)));
1304
1305                 /* Record the filesystem change in XLOG */
1306                 {
1307                         xl_dbase_drop_rec xlrec;
1308                         XLogRecData rdata[1];
1309
1310                         xlrec.db_id = db_id;
1311                         xlrec.tablespace_id = dsttablespace;
1312
1313                         rdata[0].data = (char *) &xlrec;
1314                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1315                         rdata[0].buffer = InvalidBuffer;
1316                         rdata[0].next = NULL;
1317
1318                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1319                 }
1320
1321                 pfree(dstpath);
1322         }
1323
1324         heap_endscan(scan);
1325         heap_close(rel, AccessShareLock);
1326 }
1327
1328 /*
1329  * Check for existing files that conflict with a proposed new DB OID;
1330  * return TRUE if there are any
1331  *
1332  * If there were a subdirectory in any tablespace matching the proposed new
1333  * OID, we'd get a create failure due to the duplicate name ... and then we'd
1334  * try to remove that already-existing subdirectory during the cleanup in
1335  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
1336  * instead we make this extra check before settling on the OID of the new
1337  * database.  This exactly parallels what GetNewRelFileNode() does for table
1338  * relfilenode values.
1339  */
1340 static bool
1341 check_db_file_conflict(Oid db_id)
1342 {
1343         bool            result = false;
1344         Relation        rel;
1345         HeapScanDesc scan;
1346         HeapTuple       tuple;
1347
1348         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1349         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1350         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1351         {
1352                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1353                 char       *dstpath;
1354                 struct stat st;
1355
1356                 /* Don't mess with the global tablespace */
1357                 if (dsttablespace == GLOBALTABLESPACE_OID)
1358                         continue;
1359
1360                 dstpath = GetDatabasePath(db_id, dsttablespace);
1361
1362                 if (lstat(dstpath, &st) == 0)
1363                 {
1364                         /* Found a conflicting file (or directory, whatever) */
1365                         pfree(dstpath);
1366                         result = true;
1367                         break;
1368                 }
1369
1370                 pfree(dstpath);
1371         }
1372
1373         heap_endscan(scan);
1374         heap_close(rel, AccessShareLock);
1375         return result;
1376 }
1377
1378 /*
1379  * get_database_oid - given a database name, look up the OID
1380  *
1381  * Returns InvalidOid if database name not found.
1382  */
1383 Oid
1384 get_database_oid(const char *dbname)
1385 {
1386         Relation        pg_database;
1387         ScanKeyData entry[1];
1388         SysScanDesc scan;
1389         HeapTuple       dbtuple;
1390         Oid                     oid;
1391
1392         /*
1393          * There's no syscache for pg_database indexed by name, so we must look
1394          * the hard way.
1395          */
1396         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1397         ScanKeyInit(&entry[0],
1398                                 Anum_pg_database_datname,
1399                                 BTEqualStrategyNumber, F_NAMEEQ,
1400                                 CStringGetDatum(dbname));
1401         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1402                                                           SnapshotNow, 1, entry);
1403
1404         dbtuple = systable_getnext(scan);
1405
1406         /* We assume that there can be at most one matching tuple */
1407         if (HeapTupleIsValid(dbtuple))
1408                 oid = HeapTupleGetOid(dbtuple);
1409         else
1410                 oid = InvalidOid;
1411
1412         systable_endscan(scan);
1413         heap_close(pg_database, AccessShareLock);
1414
1415         return oid;
1416 }
1417
1418
1419 /*
1420  * get_database_name - given a database OID, look up the name
1421  *
1422  * Returns a palloc'd string, or NULL if no such database.
1423  */
1424 char *
1425 get_database_name(Oid dbid)
1426 {
1427         HeapTuple       dbtuple;
1428         char       *result;
1429
1430         dbtuple = SearchSysCache(DATABASEOID,
1431                                                          ObjectIdGetDatum(dbid),
1432                                                          0, 0, 0);
1433         if (HeapTupleIsValid(dbtuple))
1434         {
1435                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1436                 ReleaseSysCache(dbtuple);
1437         }
1438         else
1439                 result = NULL;
1440
1441         return result;
1442 }
1443
1444 /*
1445  * DATABASE resource manager's routines
1446  */
1447 void
1448 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1449 {
1450         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1451
1452         if (info == XLOG_DBASE_CREATE)
1453         {
1454                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1455                 char       *src_path;
1456                 char       *dst_path;
1457                 struct stat st;
1458
1459                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1460                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1461
1462                 /*
1463                  * Our theory for replaying a CREATE is to forcibly drop the target
1464                  * subdirectory if present, then re-copy the source data. This may be
1465                  * more work than needed, but it is simple to implement.
1466                  */
1467                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1468                 {
1469                         if (!rmtree(dst_path, true))
1470                                 ereport(WARNING,
1471                                                 (errmsg("could not remove database directory \"%s\"",
1472                                                                 dst_path)));
1473                 }
1474
1475                 /*
1476                  * Force dirty buffers out to disk, to ensure source database is
1477                  * up-to-date for the copy.
1478                  */
1479                 FlushDatabaseBuffers(xlrec->src_db_id);
1480
1481                 /*
1482                  * Copy this subdirectory to the new location
1483                  *
1484                  * We don't need to copy subdirectories
1485                  */
1486                 copydir(src_path, dst_path, false);
1487         }
1488         else if (info == XLOG_DBASE_DROP)
1489         {
1490                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1491                 char       *dst_path;
1492
1493                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1494
1495                 /* Drop pages for this database that are in the shared buffer cache */
1496                 DropDatabaseBuffers(xlrec->db_id);
1497
1498                 /* Also, clean out any entries in the shared free space map */
1499                 FreeSpaceMapForgetDatabase(xlrec->db_id);
1500
1501                 /* Also, clean out any fsync requests that might be pending in md.c */
1502                 ForgetDatabaseFsyncRequests(xlrec->db_id);
1503
1504                 /* Clean out the xlog relcache too */
1505                 XLogDropDatabase(xlrec->db_id);
1506
1507                 /* And remove the physical files */
1508                 if (!rmtree(dst_path, true))
1509                         ereport(WARNING,
1510                                         (errmsg("could not remove database directory \"%s\"",
1511                                                         dst_path)));
1512         }
1513         else
1514                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1515 }
1516
1517 void
1518 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
1519 {
1520         uint8           info = xl_info & ~XLR_INFO_MASK;
1521
1522         if (info == XLOG_DBASE_CREATE)
1523         {
1524                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1525
1526                 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
1527                                                  xlrec->src_db_id, xlrec->src_tablespace_id,
1528                                                  xlrec->db_id, xlrec->tablespace_id);
1529         }
1530         else if (info == XLOG_DBASE_DROP)
1531         {
1532                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1533
1534                 appendStringInfo(buf, "drop db: dir %u/%u",
1535                                                  xlrec->db_id, xlrec->tablespace_id);
1536         }
1537         else
1538                 appendStringInfo(buf, "UNKNOWN");
1539 }