]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Update CVS HEAD for 2007 copyright. Back branches are typically not
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes.  Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.188 2007/01/05 22:19:25 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <sys/stat.h>
25
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/xact.h"
29 #include "catalog/catalog.h"
30 #include "catalog/dependency.h"
31 #include "catalog/indexing.h"
32 #include "catalog/pg_authid.h"
33 #include "catalog/pg_database.h"
34 #include "catalog/pg_tablespace.h"
35 #include "commands/comment.h"
36 #include "commands/dbcommands.h"
37 #include "commands/tablespace.h"
38 #include "mb/pg_wchar.h"
39 #include "miscadmin.h"
40 #include "postmaster/bgwriter.h"
41 #include "storage/freespace.h"
42 #include "storage/procarray.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/flatfiles.h"
46 #include "utils/fmgroids.h"
47 #include "utils/guc.h"
48 #include "utils/lsyscache.h"
49 #include "utils/syscache.h"
50
51
52 /* non-export function prototypes */
53 static bool get_db_info(const char *name, LOCKMODE lockmode,
54                         Oid *dbIdP, Oid *ownerIdP,
55                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
56                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
57                         Oid *dbTablespace);
58 static bool have_createdb_privilege(void);
59 static void remove_dbtablespaces(Oid db_id);
60 static bool check_db_file_conflict(Oid db_id);
61
62
63 /*
64  * CREATE DATABASE
65  */
66 void
67 createdb(const CreatedbStmt *stmt)
68 {
69         HeapScanDesc scan;
70         Relation        rel;
71         Oid                     src_dboid;
72         Oid                     src_owner;
73         int                     src_encoding;
74         bool            src_istemplate;
75         bool            src_allowconn;
76         Oid                     src_lastsysoid;
77         TransactionId src_frozenxid;
78         Oid                     src_deftablespace;
79         volatile Oid dst_deftablespace;
80         Relation        pg_database_rel;
81         HeapTuple       tuple;
82         Datum           new_record[Natts_pg_database];
83         char            new_record_nulls[Natts_pg_database];
84         Oid                     dboid;
85         Oid                     datdba;
86         ListCell   *option;
87         DefElem    *dtablespacename = NULL;
88         DefElem    *downer = NULL;
89         DefElem    *dtemplate = NULL;
90         DefElem    *dencoding = NULL;
91         DefElem    *dconnlimit = NULL;
92         char       *dbname = stmt->dbname;
93         char       *dbowner = NULL;
94         const char *dbtemplate = NULL;
95         int                     encoding = -1;
96         int                     dbconnlimit = -1;
97
98         /* don't call this in a transaction block */
99         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
100
101         /* Extract options from the statement node tree */
102         foreach(option, stmt->options)
103         {
104                 DefElem    *defel = (DefElem *) lfirst(option);
105
106                 if (strcmp(defel->defname, "tablespace") == 0)
107                 {
108                         if (dtablespacename)
109                                 ereport(ERROR,
110                                                 (errcode(ERRCODE_SYNTAX_ERROR),
111                                                  errmsg("conflicting or redundant options")));
112                         dtablespacename = defel;
113                 }
114                 else if (strcmp(defel->defname, "owner") == 0)
115                 {
116                         if (downer)
117                                 ereport(ERROR,
118                                                 (errcode(ERRCODE_SYNTAX_ERROR),
119                                                  errmsg("conflicting or redundant options")));
120                         downer = defel;
121                 }
122                 else if (strcmp(defel->defname, "template") == 0)
123                 {
124                         if (dtemplate)
125                                 ereport(ERROR,
126                                                 (errcode(ERRCODE_SYNTAX_ERROR),
127                                                  errmsg("conflicting or redundant options")));
128                         dtemplate = defel;
129                 }
130                 else if (strcmp(defel->defname, "encoding") == 0)
131                 {
132                         if (dencoding)
133                                 ereport(ERROR,
134                                                 (errcode(ERRCODE_SYNTAX_ERROR),
135                                                  errmsg("conflicting or redundant options")));
136                         dencoding = defel;
137                 }
138                 else if (strcmp(defel->defname, "connectionlimit") == 0)
139                 {
140                         if (dconnlimit)
141                                 ereport(ERROR,
142                                                 (errcode(ERRCODE_SYNTAX_ERROR),
143                                                  errmsg("conflicting or redundant options")));
144                         dconnlimit = defel;
145                 }
146                 else if (strcmp(defel->defname, "location") == 0)
147                 {
148                         ereport(WARNING,
149                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
150                                          errmsg("LOCATION is not supported anymore"),
151                                          errhint("Consider using tablespaces instead.")));
152                 }
153                 else
154                         elog(ERROR, "option \"%s\" not recognized",
155                                  defel->defname);
156         }
157
158         if (downer && downer->arg)
159                 dbowner = strVal(downer->arg);
160         if (dtemplate && dtemplate->arg)
161                 dbtemplate = strVal(dtemplate->arg);
162         if (dencoding && dencoding->arg)
163         {
164                 const char *encoding_name;
165
166                 if (IsA(dencoding->arg, Integer))
167                 {
168                         encoding = intVal(dencoding->arg);
169                         encoding_name = pg_encoding_to_char(encoding);
170                         if (strcmp(encoding_name, "") == 0 ||
171                                 pg_valid_server_encoding(encoding_name) < 0)
172                                 ereport(ERROR,
173                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
174                                                  errmsg("%d is not a valid encoding code",
175                                                                 encoding)));
176                 }
177                 else if (IsA(dencoding->arg, String))
178                 {
179                         encoding_name = strVal(dencoding->arg);
180                         if (pg_valid_server_encoding(encoding_name) < 0)
181                                 ereport(ERROR,
182                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
183                                                  errmsg("%s is not a valid encoding name",
184                                                                 encoding_name)));
185                         encoding = pg_char_to_encoding(encoding_name);
186                 }
187                 else
188                         elog(ERROR, "unrecognized node type: %d",
189                                  nodeTag(dencoding->arg));
190         }
191         if (dconnlimit && dconnlimit->arg)
192                 dbconnlimit = intVal(dconnlimit->arg);
193
194         /* obtain OID of proposed owner */
195         if (dbowner)
196                 datdba = get_roleid_checked(dbowner);
197         else
198                 datdba = GetUserId();
199
200         /*
201          * To create a database, must have createdb privilege and must be able to
202          * become the target role (this does not imply that the target role itself
203          * must have createdb privilege).  The latter provision guards against
204          * "giveaway" attacks.  Note that a superuser will always have both of
205          * these privileges a fortiori.
206          */
207         if (!have_createdb_privilege())
208                 ereport(ERROR,
209                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
210                                  errmsg("permission denied to create database")));
211
212         check_is_member_of_role(GetUserId(), datdba);
213
214         /*
215          * Lookup database (template) to be cloned, and obtain share lock on it.
216          * ShareLock allows two CREATE DATABASEs to work from the same template
217          * concurrently, while ensuring no one is busy dropping it in parallel
218          * (which would be Very Bad since we'd likely get an incomplete copy
219          * without knowing it).  This also prevents any new connections from being
220          * made to the source until we finish copying it, so we can be sure it
221          * won't change underneath us.
222          */
223         if (!dbtemplate)
224                 dbtemplate = "template1";               /* Default template database name */
225
226         if (!get_db_info(dbtemplate, ShareLock,
227                                          &src_dboid, &src_owner, &src_encoding,
228                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
229                                          &src_frozenxid, &src_deftablespace))
230                 ereport(ERROR,
231                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
232                                  errmsg("template database \"%s\" does not exist",
233                                                 dbtemplate)));
234
235         /*
236          * Permission check: to copy a DB that's not marked datistemplate, you
237          * must be superuser or the owner thereof.
238          */
239         if (!src_istemplate)
240         {
241                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
242                         ereport(ERROR,
243                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
244                                          errmsg("permission denied to copy database \"%s\"",
245                                                         dbtemplate)));
246         }
247
248         /*
249          * The source DB can't have any active backends, except this one
250          * (exception is to allow CREATE DB while connected to template1).
251          * Otherwise we might copy inconsistent data.
252          */
253         if (DatabaseHasActiveBackends(src_dboid, true))
254                 ereport(ERROR,
255                                 (errcode(ERRCODE_OBJECT_IN_USE),
256                         errmsg("source database \"%s\" is being accessed by other users",
257                                    dbtemplate)));
258
259         /* If encoding is defaulted, use source's encoding */
260         if (encoding < 0)
261                 encoding = src_encoding;
262
263         /* Some encodings are client only */
264         if (!PG_VALID_BE_ENCODING(encoding))
265                 ereport(ERROR,
266                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
267                                  errmsg("invalid server encoding %d", encoding)));
268
269         /* Resolve default tablespace for new database */
270         if (dtablespacename && dtablespacename->arg)
271         {
272                 char       *tablespacename;
273                 AclResult       aclresult;
274
275                 tablespacename = strVal(dtablespacename->arg);
276                 dst_deftablespace = get_tablespace_oid(tablespacename);
277                 if (!OidIsValid(dst_deftablespace))
278                         ereport(ERROR,
279                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
280                                          errmsg("tablespace \"%s\" does not exist",
281                                                         tablespacename)));
282                 /* check permissions */
283                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
284                                                                                    ACL_CREATE);
285                 if (aclresult != ACLCHECK_OK)
286                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
287                                                    tablespacename);
288
289                 /*
290                  * If we are trying to change the default tablespace of the template,
291                  * we require that the template not have any files in the new default
292                  * tablespace.  This is necessary because otherwise the copied
293                  * database would contain pg_class rows that refer to its default
294                  * tablespace both explicitly (by OID) and implicitly (as zero), which
295                  * would cause problems.  For example another CREATE DATABASE using
296                  * the copied database as template, and trying to change its default
297                  * tablespace again, would yield outright incorrect results (it would
298                  * improperly move tables to the new default tablespace that should
299                  * stay in the same tablespace).
300                  */
301                 if (dst_deftablespace != src_deftablespace)
302                 {
303                         char       *srcpath;
304                         struct stat st;
305
306                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
307
308                         if (stat(srcpath, &st) == 0 &&
309                                 S_ISDIR(st.st_mode) &&
310                                 !directory_is_empty(srcpath))
311                                 ereport(ERROR,
312                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
313                                                  errmsg("cannot assign new default tablespace \"%s\"",
314                                                                 tablespacename),
315                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
316                                                                    dbtemplate)));
317                         pfree(srcpath);
318                 }
319         }
320         else
321         {
322                 /* Use template database's default tablespace */
323                 dst_deftablespace = src_deftablespace;
324                 /* Note there is no additional permission check in this path */
325         }
326
327         /*
328          * Check for db name conflict.  This is just to give a more friendly error
329          * message than "unique index violation".  There's a race condition but
330          * we're willing to accept the less friendly message in that case.
331          */
332         if (OidIsValid(get_database_oid(dbname)))
333                 ereport(ERROR,
334                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
335                                  errmsg("database \"%s\" already exists", dbname)));
336
337         /*
338          * Select an OID for the new database, checking that it doesn't have
339          * a filename conflict with anything already existing in the tablespace
340          * directories.
341          */
342         pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
343
344         do
345         {
346                 dboid = GetNewOid(pg_database_rel);
347         } while (check_db_file_conflict(dboid));
348
349         /*
350          * Insert a new tuple into pg_database.  This establishes our ownership of
351          * the new database name (anyone else trying to insert the same name will
352          * block on the unique index, and fail after we commit).
353          */
354
355         /* Form tuple */
356         MemSet(new_record, 0, sizeof(new_record));
357         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
358
359         new_record[Anum_pg_database_datname - 1] =
360                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
361         new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
362         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
363         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
364         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
365         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
366         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
367         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
368         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
369
370         /*
371          * We deliberately set datconfig and datacl to defaults (NULL), rather
372          * than copying them from the template database.  Copying datacl would be
373          * a bad idea when the owner is not the same as the template's owner. It's
374          * more debatable whether datconfig should be copied.
375          */
376         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
377         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
378
379         tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
380                                                    new_record, new_record_nulls);
381
382         HeapTupleSetOid(tuple, dboid);
383
384         simple_heap_insert(pg_database_rel, tuple);
385
386         /* Update indexes */
387         CatalogUpdateIndexes(pg_database_rel, tuple);
388
389         /*
390          * Now generate additional catalog entries associated with the new DB
391          */
392
393         /* Register owner dependency */
394         recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
395
396         /* Create pg_shdepend entries for objects within database */
397         copyTemplateDependencies(src_dboid, dboid);
398
399         /*
400          * Force dirty buffers out to disk, to ensure source database is
401          * up-to-date for the copy.  (We really only need to flush buffers for the
402          * source database, but bufmgr.c provides no API for that.)
403          */
404         BufferSync();
405
406         /*
407          * Once we start copying subdirectories, we need to be able to clean 'em
408          * up if we fail.  Establish a TRY block to make sure this happens. (This
409          * is not a 100% solution, because of the possibility of failure during
410          * transaction commit after we leave this routine, but it should handle
411          * most scenarios.)
412          */
413         PG_TRY();
414         {
415                 /*
416                  * Iterate through all tablespaces of the template database, and copy
417                  * each one to the new database.
418                  */
419                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
420                 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
421                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
422                 {
423                         Oid                     srctablespace = HeapTupleGetOid(tuple);
424                         Oid                     dsttablespace;
425                         char       *srcpath;
426                         char       *dstpath;
427                         struct stat st;
428
429                         /* No need to copy global tablespace */
430                         if (srctablespace == GLOBALTABLESPACE_OID)
431                                 continue;
432
433                         srcpath = GetDatabasePath(src_dboid, srctablespace);
434
435                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
436                                 directory_is_empty(srcpath))
437                         {
438                                 /* Assume we can ignore it */
439                                 pfree(srcpath);
440                                 continue;
441                         }
442
443                         if (srctablespace == src_deftablespace)
444                                 dsttablespace = dst_deftablespace;
445                         else
446                                 dsttablespace = srctablespace;
447
448                         dstpath = GetDatabasePath(dboid, dsttablespace);
449
450                         /*
451                          * Copy this subdirectory to the new location
452                          *
453                          * We don't need to copy subdirectories
454                          */
455                         copydir(srcpath, dstpath, false);
456
457                         /* Record the filesystem change in XLOG */
458                         {
459                                 xl_dbase_create_rec xlrec;
460                                 XLogRecData rdata[1];
461
462                                 xlrec.db_id = dboid;
463                                 xlrec.tablespace_id = dsttablespace;
464                                 xlrec.src_db_id = src_dboid;
465                                 xlrec.src_tablespace_id = srctablespace;
466
467                                 rdata[0].data = (char *) &xlrec;
468                                 rdata[0].len = sizeof(xl_dbase_create_rec);
469                                 rdata[0].buffer = InvalidBuffer;
470                                 rdata[0].next = NULL;
471
472                                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
473                         }
474                 }
475                 heap_endscan(scan);
476                 heap_close(rel, AccessShareLock);
477
478                 /*
479                  * We force a checkpoint before committing.  This effectively means
480                  * that committed XLOG_DBASE_CREATE operations will never need to be
481                  * replayed (at least not in ordinary crash recovery; we still have to
482                  * make the XLOG entry for the benefit of PITR operations). This
483                  * avoids two nasty scenarios:
484                  *
485                  * #1: When PITR is off, we don't XLOG the contents of newly created
486                  * indexes; therefore the drop-and-recreate-whole-directory behavior
487                  * of DBASE_CREATE replay would lose such indexes.
488                  *
489                  * #2: Since we have to recopy the source database during DBASE_CREATE
490                  * replay, we run the risk of copying changes in it that were
491                  * committed after the original CREATE DATABASE command but before the
492                  * system crash that led to the replay.  This is at least unexpected
493                  * and at worst could lead to inconsistencies, eg duplicate table
494                  * names.
495                  *
496                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
497                  *
498                  * In PITR replay, the first of these isn't an issue, and the second
499                  * is only a risk if the CREATE DATABASE and subsequent template
500                  * database change both occur while a base backup is being taken.
501                  * There doesn't seem to be much we can do about that except document
502                  * it as a limitation.
503                  *
504                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
505                  * we can avoid this.
506                  */
507                 RequestCheckpoint(true, false);
508
509                 /*
510                  * Close pg_database, but keep lock till commit (this is important to
511                  * prevent any risk of deadlock failure while updating flat file)
512                  */
513                 heap_close(pg_database_rel, NoLock);
514
515                 /*
516                  * Set flag to update flat database file at commit.
517                  */
518                 database_file_update_needed();
519         }
520         PG_CATCH();
521         {
522                 /* Release lock on source database before doing recursive remove */
523                 UnlockSharedObject(DatabaseRelationId, src_dboid, 0,
524                                                    ShareLock);
525
526                 /* Throw away any successfully copied subdirectories */
527                 remove_dbtablespaces(dboid);
528
529                 PG_RE_THROW();
530         }
531         PG_END_TRY();
532 }
533
534
535 /*
536  * DROP DATABASE
537  */
538 void
539 dropdb(const char *dbname, bool missing_ok)
540 {
541         Oid                     db_id;
542         bool            db_istemplate;
543         Relation        pgdbrel;
544         HeapTuple       tup;
545
546         PreventTransactionChain((void *) dbname, "DROP DATABASE");
547
548         AssertArg(dbname);
549
550         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
551                 ereport(ERROR,
552                                 (errcode(ERRCODE_OBJECT_IN_USE),
553                                  errmsg("cannot drop the currently open database")));
554
555         /*
556          * Look up the target database's OID, and get exclusive lock on it. We
557          * need this to ensure that no new backend starts up in the target
558          * database while we are deleting it (see postinit.c), and that no one is
559          * using it as a CREATE DATABASE template or trying to delete it for
560          * themselves.
561          */
562         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
563
564         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
565                                          &db_istemplate, NULL, NULL, NULL, NULL))
566         {
567                 if (!missing_ok)
568                 {
569                         ereport(ERROR,
570                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
571                                          errmsg("database \"%s\" does not exist", dbname)));
572                 }
573                 else
574                 {
575                         /* Close pg_database, release the lock, since we changed nothing */
576                         heap_close(pgdbrel, RowExclusiveLock);
577                         ereport(NOTICE,
578                                         (errmsg("database \"%s\" does not exist, skipping",
579                                                         dbname)));
580                         return;
581                 }
582         }
583
584         /*
585          * Permission checks
586          */
587         if (!pg_database_ownercheck(db_id, GetUserId()))
588                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
589                                            dbname);
590
591         /*
592          * Disallow dropping a DB that is marked istemplate.  This is just to
593          * prevent people from accidentally dropping template0 or template1; they
594          * can do so if they're really determined ...
595          */
596         if (db_istemplate)
597                 ereport(ERROR,
598                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
599                                  errmsg("cannot drop a template database")));
600
601         /*
602          * Check for active backends in the target database.  (Because we hold the
603          * database lock, no new ones can start after this.)
604          */
605         if (DatabaseHasActiveBackends(db_id, false))
606                 ereport(ERROR,
607                                 (errcode(ERRCODE_OBJECT_IN_USE),
608                                  errmsg("database \"%s\" is being accessed by other users",
609                                                 dbname)));
610
611         /*
612          * Remove the database's tuple from pg_database.
613          */
614         tup = SearchSysCache(DATABASEOID,
615                                                  ObjectIdGetDatum(db_id),
616                                                  0, 0, 0);
617         if (!HeapTupleIsValid(tup))
618                 elog(ERROR, "cache lookup failed for database %u", db_id);
619
620         simple_heap_delete(pgdbrel, &tup->t_self);
621
622         ReleaseSysCache(tup);
623
624         /*
625          * Delete any comments associated with the database.
626          */
627         DeleteSharedComments(db_id, DatabaseRelationId);
628
629         /*
630          * Remove shared dependency references for the database.
631          */
632         dropDatabaseDependencies(db_id);
633
634         /*
635          * Drop pages for this database that are in the shared buffer cache. This
636          * is important to ensure that no remaining backend tries to write out a
637          * dirty buffer to the dead database later...
638          */
639         DropDatabaseBuffers(db_id);
640
641         /*
642          * Also, clean out any entries in the shared free space map.
643          */
644         FreeSpaceMapForgetDatabase(db_id);
645
646         /*
647          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
648          * open files, which would cause rmdir() to fail.
649          */
650 #ifdef WIN32
651         RequestCheckpoint(true, false);
652 #endif
653
654         /*
655          * Remove all tablespace subdirs belonging to the database.
656          */
657         remove_dbtablespaces(db_id);
658
659         /*
660          * Close pg_database, but keep lock till commit (this is important to
661          * prevent any risk of deadlock failure while updating flat file)
662          */
663         heap_close(pgdbrel, NoLock);
664
665         /*
666          * Set flag to update flat database file at commit.
667          */
668         database_file_update_needed();
669 }
670
671
672 /*
673  * Rename database
674  */
675 void
676 RenameDatabase(const char *oldname, const char *newname)
677 {
678         Oid                     db_id;
679         HeapTuple       newtup;
680         Relation        rel;
681
682         /*
683          * Look up the target database's OID, and get exclusive lock on it. We
684          * need this for the same reasons as DROP DATABASE.
685          */
686         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
687
688         if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
689                                          NULL, NULL, NULL, NULL, NULL))
690                 ereport(ERROR,
691                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
692                                  errmsg("database \"%s\" does not exist", oldname)));
693
694         /*
695          * XXX Client applications probably store the current database somewhere,
696          * so renaming it could cause confusion.  On the other hand, there may not
697          * be an actual problem besides a little confusion, so think about this
698          * and decide.
699          */
700         if (db_id == MyDatabaseId)
701                 ereport(ERROR,
702                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
703                                  errmsg("current database may not be renamed")));
704
705         /*
706          * Make sure the database does not have active sessions.  This is the same
707          * concern as above, but applied to other sessions.
708          */
709         if (DatabaseHasActiveBackends(db_id, false))
710                 ereport(ERROR,
711                                 (errcode(ERRCODE_OBJECT_IN_USE),
712                                  errmsg("database \"%s\" is being accessed by other users",
713                                                 oldname)));
714
715         /* make sure the new name doesn't exist */
716         if (OidIsValid(get_database_oid(newname)))
717                 ereport(ERROR,
718                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
719                                  errmsg("database \"%s\" already exists", newname)));
720
721         /* must be owner */
722         if (!pg_database_ownercheck(db_id, GetUserId()))
723                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
724                                            oldname);
725
726         /* must have createdb rights */
727         if (!have_createdb_privilege())
728                 ereport(ERROR,
729                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
730                                  errmsg("permission denied to rename database")));
731
732         /* rename */
733         newtup = SearchSysCacheCopy(DATABASEOID,
734                                                                 ObjectIdGetDatum(db_id),
735                                                                 0, 0, 0);
736         if (!HeapTupleIsValid(newtup))
737                 elog(ERROR, "cache lookup failed for database %u", db_id);
738         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
739         simple_heap_update(rel, &newtup->t_self, newtup);
740         CatalogUpdateIndexes(rel, newtup);
741
742         /*
743          * Close pg_database, but keep lock till commit (this is important to
744          * prevent any risk of deadlock failure while updating flat file)
745          */
746         heap_close(rel, NoLock);
747
748         /*
749          * Set flag to update flat database file at commit.
750          */
751         database_file_update_needed();
752 }
753
754
755 /*
756  * ALTER DATABASE name ...
757  */
758 void
759 AlterDatabase(AlterDatabaseStmt *stmt)
760 {
761         Relation        rel;
762         HeapTuple       tuple,
763                                 newtuple;
764         ScanKeyData scankey;
765         SysScanDesc scan;
766         ListCell   *option;
767         int                     connlimit = -1;
768         DefElem    *dconnlimit = NULL;
769         Datum           new_record[Natts_pg_database];
770         char            new_record_nulls[Natts_pg_database];
771         char            new_record_repl[Natts_pg_database];
772
773         /* Extract options from the statement node tree */
774         foreach(option, stmt->options)
775         {
776                 DefElem    *defel = (DefElem *) lfirst(option);
777
778                 if (strcmp(defel->defname, "connectionlimit") == 0)
779                 {
780                         if (dconnlimit)
781                                 ereport(ERROR,
782                                                 (errcode(ERRCODE_SYNTAX_ERROR),
783                                                  errmsg("conflicting or redundant options")));
784                         dconnlimit = defel;
785                 }
786                 else
787                         elog(ERROR, "option \"%s\" not recognized",
788                                  defel->defname);
789         }
790
791         if (dconnlimit)
792                 connlimit = intVal(dconnlimit->arg);
793
794         /*
795          * Get the old tuple.  We don't need a lock on the database per se,
796          * because we're not going to do anything that would mess up incoming
797          * connections.
798          */
799         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
800         ScanKeyInit(&scankey,
801                                 Anum_pg_database_datname,
802                                 BTEqualStrategyNumber, F_NAMEEQ,
803                                 NameGetDatum(stmt->dbname));
804         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
805                                                           SnapshotNow, 1, &scankey);
806         tuple = systable_getnext(scan);
807         if (!HeapTupleIsValid(tuple))
808                 ereport(ERROR,
809                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
810                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
811
812         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
813                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
814                                            stmt->dbname);
815
816         /*
817          * Build an updated tuple, perusing the information just obtained
818          */
819         MemSet(new_record, 0, sizeof(new_record));
820         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
821         MemSet(new_record_repl, ' ', sizeof(new_record_repl));
822
823         if (dconnlimit)
824         {
825                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
826                 new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
827         }
828
829         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
830                                                                 new_record_nulls, new_record_repl);
831         simple_heap_update(rel, &tuple->t_self, newtuple);
832
833         /* Update indexes */
834         CatalogUpdateIndexes(rel, newtuple);
835
836         systable_endscan(scan);
837
838         /* Close pg_database, but keep lock till commit */
839         heap_close(rel, NoLock);
840
841         /*
842          * We don't bother updating the flat file since the existing options for
843          * ALTER DATABASE don't affect it.
844          */
845 }
846
847
848 /*
849  * ALTER DATABASE name SET ...
850  */
851 void
852 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
853 {
854         char       *valuestr;
855         HeapTuple       tuple,
856                                 newtuple;
857         Relation        rel;
858         ScanKeyData scankey;
859         SysScanDesc scan;
860         Datum           repl_val[Natts_pg_database];
861         char            repl_null[Natts_pg_database];
862         char            repl_repl[Natts_pg_database];
863
864         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
865
866         /*
867          * Get the old tuple.  We don't need a lock on the database per se,
868          * because we're not going to do anything that would mess up incoming
869          * connections.
870          */
871         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
872         ScanKeyInit(&scankey,
873                                 Anum_pg_database_datname,
874                                 BTEqualStrategyNumber, F_NAMEEQ,
875                                 NameGetDatum(stmt->dbname));
876         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
877                                                           SnapshotNow, 1, &scankey);
878         tuple = systable_getnext(scan);
879         if (!HeapTupleIsValid(tuple))
880                 ereport(ERROR,
881                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
882                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
883
884         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
885                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
886                                            stmt->dbname);
887
888         MemSet(repl_repl, ' ', sizeof(repl_repl));
889         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
890
891         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
892         {
893                 /* RESET ALL */
894                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
895                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
896         }
897         else
898         {
899                 Datum           datum;
900                 bool            isnull;
901                 ArrayType  *a;
902
903                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
904
905                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
906                                                          RelationGetDescr(rel), &isnull);
907
908                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
909
910                 if (valuestr)
911                         a = GUCArrayAdd(a, stmt->variable, valuestr);
912                 else
913                         a = GUCArrayDelete(a, stmt->variable);
914
915                 if (a)
916                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
917                 else
918                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
919         }
920
921         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
922         simple_heap_update(rel, &tuple->t_self, newtuple);
923
924         /* Update indexes */
925         CatalogUpdateIndexes(rel, newtuple);
926
927         systable_endscan(scan);
928
929         /* Close pg_database, but keep lock till commit */
930         heap_close(rel, NoLock);
931
932         /*
933          * We don't bother updating the flat file since ALTER DATABASE SET doesn't
934          * affect it.
935          */
936 }
937
938
939 /*
940  * ALTER DATABASE name OWNER TO newowner
941  */
942 void
943 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
944 {
945         HeapTuple       tuple;
946         Relation        rel;
947         ScanKeyData scankey;
948         SysScanDesc scan;
949         Form_pg_database datForm;
950
951         /*
952          * Get the old tuple.  We don't need a lock on the database per se,
953          * because we're not going to do anything that would mess up incoming
954          * connections.
955          */
956         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
957         ScanKeyInit(&scankey,
958                                 Anum_pg_database_datname,
959                                 BTEqualStrategyNumber, F_NAMEEQ,
960                                 NameGetDatum(dbname));
961         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
962                                                           SnapshotNow, 1, &scankey);
963         tuple = systable_getnext(scan);
964         if (!HeapTupleIsValid(tuple))
965                 ereport(ERROR,
966                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
967                                  errmsg("database \"%s\" does not exist", dbname)));
968
969         datForm = (Form_pg_database) GETSTRUCT(tuple);
970
971         /*
972          * If the new owner is the same as the existing owner, consider the
973          * command to have succeeded.  This is to be consistent with other
974          * objects.
975          */
976         if (datForm->datdba != newOwnerId)
977         {
978                 Datum           repl_val[Natts_pg_database];
979                 char            repl_null[Natts_pg_database];
980                 char            repl_repl[Natts_pg_database];
981                 Acl                *newAcl;
982                 Datum           aclDatum;
983                 bool            isNull;
984                 HeapTuple       newtuple;
985
986                 /* Otherwise, must be owner of the existing object */
987                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
988                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
989                                                    dbname);
990
991                 /* Must be able to become new owner */
992                 check_is_member_of_role(GetUserId(), newOwnerId);
993
994                 /*
995                  * must have createdb rights
996                  *
997                  * NOTE: This is different from other alter-owner checks in that the
998                  * current user is checked for createdb privileges instead of the
999                  * destination owner.  This is consistent with the CREATE case for
1000                  * databases.  Because superusers will always have this right, we need
1001                  * no special case for them.
1002                  */
1003                 if (!have_createdb_privilege())
1004                         ereport(ERROR,
1005                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1006                                    errmsg("permission denied to change owner of database")));
1007
1008                 memset(repl_null, ' ', sizeof(repl_null));
1009                 memset(repl_repl, ' ', sizeof(repl_repl));
1010
1011                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
1012                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1013
1014                 /*
1015                  * Determine the modified ACL for the new owner.  This is only
1016                  * necessary when the ACL is non-null.
1017                  */
1018                 aclDatum = heap_getattr(tuple,
1019                                                                 Anum_pg_database_datacl,
1020                                                                 RelationGetDescr(rel),
1021                                                                 &isNull);
1022                 if (!isNull)
1023                 {
1024                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1025                                                                  datForm->datdba, newOwnerId);
1026                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
1027                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1028                 }
1029
1030                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1031                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1032                 CatalogUpdateIndexes(rel, newtuple);
1033
1034                 heap_freetuple(newtuple);
1035
1036                 /* Update owner dependency reference */
1037                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1038                                                                 newOwnerId);
1039         }
1040
1041         systable_endscan(scan);
1042
1043         /* Close pg_database, but keep lock till commit */
1044         heap_close(rel, NoLock);
1045
1046         /*
1047          * We don't bother updating the flat file since ALTER DATABASE OWNER
1048          * doesn't affect it.
1049          */
1050 }
1051
1052
1053 /*
1054  * Helper functions
1055  */
1056
1057 /*
1058  * Look up info about the database named "name".  If the database exists,
1059  * obtain the specified lock type on it, fill in any of the remaining
1060  * parameters that aren't NULL, and return TRUE.  If no such database,
1061  * return FALSE.
1062  */
1063 static bool
1064 get_db_info(const char *name, LOCKMODE lockmode,
1065                         Oid *dbIdP, Oid *ownerIdP,
1066                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1067                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1068                         Oid *dbTablespace)
1069 {
1070         bool            result = false;
1071         Relation        relation;
1072
1073         AssertArg(name);
1074
1075         /* Caller may wish to grab a better lock on pg_database beforehand... */
1076         relation = heap_open(DatabaseRelationId, AccessShareLock);
1077
1078         /*
1079          * Loop covers the rare case where the database is renamed before we can
1080          * lock it.  We try again just in case we can find a new one of the same
1081          * name.
1082          */
1083         for (;;)
1084         {
1085                 ScanKeyData scanKey;
1086                 SysScanDesc scan;
1087                 HeapTuple       tuple;
1088                 Oid                     dbOid;
1089
1090                 /*
1091                  * there's no syscache for database-indexed-by-name, so must do it the
1092                  * hard way
1093                  */
1094                 ScanKeyInit(&scanKey,
1095                                         Anum_pg_database_datname,
1096                                         BTEqualStrategyNumber, F_NAMEEQ,
1097                                         NameGetDatum(name));
1098
1099                 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1100                                                                   SnapshotNow, 1, &scanKey);
1101
1102                 tuple = systable_getnext(scan);
1103
1104                 if (!HeapTupleIsValid(tuple))
1105                 {
1106                         /* definitely no database of that name */
1107                         systable_endscan(scan);
1108                         break;
1109                 }
1110
1111                 dbOid = HeapTupleGetOid(tuple);
1112
1113                 systable_endscan(scan);
1114
1115                 /*
1116                  * Now that we have a database OID, we can try to lock the DB.
1117                  */
1118                 if (lockmode != NoLock)
1119                         LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1120
1121                 /*
1122                  * And now, re-fetch the tuple by OID.  If it's still there and still
1123                  * the same name, we win; else, drop the lock and loop back to try
1124                  * again.
1125                  */
1126                 tuple = SearchSysCache(DATABASEOID,
1127                                                            ObjectIdGetDatum(dbOid),
1128                                                            0, 0, 0);
1129                 if (HeapTupleIsValid(tuple))
1130                 {
1131                         Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1132
1133                         if (strcmp(name, NameStr(dbform->datname)) == 0)
1134                         {
1135                                 /* oid of the database */
1136                                 if (dbIdP)
1137                                         *dbIdP = dbOid;
1138                                 /* oid of the owner */
1139                                 if (ownerIdP)
1140                                         *ownerIdP = dbform->datdba;
1141                                 /* character encoding */
1142                                 if (encodingP)
1143                                         *encodingP = dbform->encoding;
1144                                 /* allowed as template? */
1145                                 if (dbIsTemplateP)
1146                                         *dbIsTemplateP = dbform->datistemplate;
1147                                 /* allowing connections? */
1148                                 if (dbAllowConnP)
1149                                         *dbAllowConnP = dbform->datallowconn;
1150                                 /* last system OID used in database */
1151                                 if (dbLastSysOidP)
1152                                         *dbLastSysOidP = dbform->datlastsysoid;
1153                                 /* limit of frozen XIDs */
1154                                 if (dbFrozenXidP)
1155                                         *dbFrozenXidP = dbform->datfrozenxid;
1156                                 /* default tablespace for this database */
1157                                 if (dbTablespace)
1158                                         *dbTablespace = dbform->dattablespace;
1159                                 ReleaseSysCache(tuple);
1160                                 result = true;
1161                                 break;
1162                         }
1163                         /* can only get here if it was just renamed */
1164                         ReleaseSysCache(tuple);
1165                 }
1166
1167                 if (lockmode != NoLock)
1168                         UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1169         }
1170
1171         heap_close(relation, AccessShareLock);
1172
1173         return result;
1174 }
1175
1176 /* Check if current user has createdb privileges */
1177 static bool
1178 have_createdb_privilege(void)
1179 {
1180         bool            result = false;
1181         HeapTuple       utup;
1182
1183         /* Superusers can always do everything */
1184         if (superuser())
1185                 return true;
1186
1187         utup = SearchSysCache(AUTHOID,
1188                                                   ObjectIdGetDatum(GetUserId()),
1189                                                   0, 0, 0);
1190         if (HeapTupleIsValid(utup))
1191         {
1192                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1193                 ReleaseSysCache(utup);
1194         }
1195         return result;
1196 }
1197
1198 /*
1199  * Remove tablespace directories
1200  *
1201  * We don't know what tablespaces db_id is using, so iterate through all
1202  * tablespaces removing <tablespace>/db_id
1203  */
1204 static void
1205 remove_dbtablespaces(Oid db_id)
1206 {
1207         Relation        rel;
1208         HeapScanDesc scan;
1209         HeapTuple       tuple;
1210
1211         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1212         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1213         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1214         {
1215                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1216                 char       *dstpath;
1217                 struct stat st;
1218
1219                 /* Don't mess with the global tablespace */
1220                 if (dsttablespace == GLOBALTABLESPACE_OID)
1221                         continue;
1222
1223                 dstpath = GetDatabasePath(db_id, dsttablespace);
1224
1225                 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1226                 {
1227                         /* Assume we can ignore it */
1228                         pfree(dstpath);
1229                         continue;
1230                 }
1231
1232                 if (!rmtree(dstpath, true))
1233                         ereport(WARNING,
1234                                         (errmsg("could not remove database directory \"%s\"",
1235                                                         dstpath)));
1236
1237                 /* Record the filesystem change in XLOG */
1238                 {
1239                         xl_dbase_drop_rec xlrec;
1240                         XLogRecData rdata[1];
1241
1242                         xlrec.db_id = db_id;
1243                         xlrec.tablespace_id = dsttablespace;
1244
1245                         rdata[0].data = (char *) &xlrec;
1246                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1247                         rdata[0].buffer = InvalidBuffer;
1248                         rdata[0].next = NULL;
1249
1250                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1251                 }
1252
1253                 pfree(dstpath);
1254         }
1255
1256         heap_endscan(scan);
1257         heap_close(rel, AccessShareLock);
1258 }
1259
1260 /*
1261  * Check for existing files that conflict with a proposed new DB OID;
1262  * return TRUE if there are any
1263  *
1264  * If there were a subdirectory in any tablespace matching the proposed new
1265  * OID, we'd get a create failure due to the duplicate name ... and then we'd
1266  * try to remove that already-existing subdirectory during the cleanup in
1267  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
1268  * instead we make this extra check before settling on the OID of the new
1269  * database.  This exactly parallels what GetNewRelFileNode() does for table
1270  * relfilenode values.
1271  */
1272 static bool
1273 check_db_file_conflict(Oid db_id)
1274 {
1275         bool            result = false;
1276         Relation        rel;
1277         HeapScanDesc scan;
1278         HeapTuple       tuple;
1279
1280         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1281         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1282         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1283         {
1284                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1285                 char       *dstpath;
1286                 struct stat st;
1287
1288                 /* Don't mess with the global tablespace */
1289                 if (dsttablespace == GLOBALTABLESPACE_OID)
1290                         continue;
1291
1292                 dstpath = GetDatabasePath(db_id, dsttablespace);
1293
1294                 if (lstat(dstpath, &st) == 0)
1295                 {
1296                         /* Found a conflicting file (or directory, whatever) */
1297                         pfree(dstpath);
1298                         result = true;
1299                         break;
1300                 }
1301
1302                 pfree(dstpath);
1303         }
1304
1305         heap_endscan(scan);
1306         heap_close(rel, AccessShareLock);
1307         return result;
1308 }
1309
1310 /*
1311  * get_database_oid - given a database name, look up the OID
1312  *
1313  * Returns InvalidOid if database name not found.
1314  */
1315 Oid
1316 get_database_oid(const char *dbname)
1317 {
1318         Relation        pg_database;
1319         ScanKeyData entry[1];
1320         SysScanDesc scan;
1321         HeapTuple       dbtuple;
1322         Oid                     oid;
1323
1324         /*
1325          * There's no syscache for pg_database indexed by name, so we must look
1326          * the hard way.
1327          */
1328         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1329         ScanKeyInit(&entry[0],
1330                                 Anum_pg_database_datname,
1331                                 BTEqualStrategyNumber, F_NAMEEQ,
1332                                 CStringGetDatum(dbname));
1333         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1334                                                           SnapshotNow, 1, entry);
1335
1336         dbtuple = systable_getnext(scan);
1337
1338         /* We assume that there can be at most one matching tuple */
1339         if (HeapTupleIsValid(dbtuple))
1340                 oid = HeapTupleGetOid(dbtuple);
1341         else
1342                 oid = InvalidOid;
1343
1344         systable_endscan(scan);
1345         heap_close(pg_database, AccessShareLock);
1346
1347         return oid;
1348 }
1349
1350
1351 /*
1352  * get_database_name - given a database OID, look up the name
1353  *
1354  * Returns a palloc'd string, or NULL if no such database.
1355  */
1356 char *
1357 get_database_name(Oid dbid)
1358 {
1359         HeapTuple       dbtuple;
1360         char       *result;
1361
1362         dbtuple = SearchSysCache(DATABASEOID,
1363                                                          ObjectIdGetDatum(dbid),
1364                                                          0, 0, 0);
1365         if (HeapTupleIsValid(dbtuple))
1366         {
1367                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1368                 ReleaseSysCache(dbtuple);
1369         }
1370         else
1371                 result = NULL;
1372
1373         return result;
1374 }
1375
1376 /*
1377  * DATABASE resource manager's routines
1378  */
1379 void
1380 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1381 {
1382         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1383
1384         if (info == XLOG_DBASE_CREATE)
1385         {
1386                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1387                 char       *src_path;
1388                 char       *dst_path;
1389                 struct stat st;
1390
1391                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1392                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1393
1394                 /*
1395                  * Our theory for replaying a CREATE is to forcibly drop the target
1396                  * subdirectory if present, then re-copy the source data. This may be
1397                  * more work than needed, but it is simple to implement.
1398                  */
1399                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1400                 {
1401                         if (!rmtree(dst_path, true))
1402                                 ereport(WARNING,
1403                                                 (errmsg("could not remove database directory \"%s\"",
1404                                                                 dst_path)));
1405                 }
1406
1407                 /*
1408                  * Force dirty buffers out to disk, to ensure source database is
1409                  * up-to-date for the copy.  (We really only need to flush buffers for
1410                  * the source database, but bufmgr.c provides no API for that.)
1411                  */
1412                 BufferSync();
1413
1414                 /*
1415                  * Copy this subdirectory to the new location
1416                  *
1417                  * We don't need to copy subdirectories
1418                  */
1419                 copydir(src_path, dst_path, false);
1420         }
1421         else if (info == XLOG_DBASE_DROP)
1422         {
1423                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1424                 char       *dst_path;
1425
1426                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1427
1428                 /* Drop pages for this database that are in the shared buffer cache */
1429                 DropDatabaseBuffers(xlrec->db_id);
1430
1431                 /* Also, clean out any entries in the shared free space map */
1432                 FreeSpaceMapForgetDatabase(xlrec->db_id);
1433
1434                 /* Clean out the xlog relcache too */
1435                 XLogDropDatabase(xlrec->db_id);
1436
1437                 /* And remove the physical files */
1438                 if (!rmtree(dst_path, true))
1439                         ereport(WARNING,
1440                                         (errmsg("could not remove database directory \"%s\"",
1441                                                         dst_path)));
1442         }
1443         else
1444                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1445 }
1446
1447 void
1448 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
1449 {
1450         uint8           info = xl_info & ~XLR_INFO_MASK;
1451
1452         if (info == XLOG_DBASE_CREATE)
1453         {
1454                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1455
1456                 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
1457                                                  xlrec->src_db_id, xlrec->src_tablespace_id,
1458                                                  xlrec->db_id, xlrec->tablespace_id);
1459         }
1460         else if (info == XLOG_DBASE_DROP)
1461         {
1462                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1463
1464                 appendStringInfo(buf, "drop db: dir %u/%u",
1465                                                  xlrec->db_id, xlrec->tablespace_id);
1466         }
1467         else
1468                 appendStringInfo(buf, "UNKNOWN");
1469 }