]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Call pgstat_drop_database during DROP DATABASE, so that any stats file
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes.  Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.192 2007/02/09 16:12:18 tgl Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <sys/stat.h>
25
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/xact.h"
29 #include "catalog/catalog.h"
30 #include "catalog/dependency.h"
31 #include "catalog/indexing.h"
32 #include "catalog/pg_authid.h"
33 #include "catalog/pg_database.h"
34 #include "catalog/pg_tablespace.h"
35 #include "commands/comment.h"
36 #include "commands/dbcommands.h"
37 #include "commands/tablespace.h"
38 #include "mb/pg_wchar.h"
39 #include "miscadmin.h"
40 #include "pgstat.h"
41 #include "postmaster/bgwriter.h"
42 #include "storage/freespace.h"
43 #include "storage/procarray.h"
44 #include "storage/smgr.h"
45 #include "utils/acl.h"
46 #include "utils/builtins.h"
47 #include "utils/flatfiles.h"
48 #include "utils/fmgroids.h"
49 #include "utils/guc.h"
50 #include "utils/lsyscache.h"
51 #include "utils/syscache.h"
52
53
54 /* non-export function prototypes */
55 static bool get_db_info(const char *name, LOCKMODE lockmode,
56                         Oid *dbIdP, Oid *ownerIdP,
57                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
58                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
59                         Oid *dbTablespace);
60 static bool have_createdb_privilege(void);
61 static void remove_dbtablespaces(Oid db_id);
62 static bool check_db_file_conflict(Oid db_id);
63
64
65 /*
66  * CREATE DATABASE
67  */
68 void
69 createdb(const CreatedbStmt *stmt)
70 {
71         HeapScanDesc scan;
72         Relation        rel;
73         Oid                     src_dboid;
74         Oid                     src_owner;
75         int                     src_encoding;
76         bool            src_istemplate;
77         bool            src_allowconn;
78         Oid                     src_lastsysoid;
79         TransactionId src_frozenxid;
80         Oid                     src_deftablespace;
81         volatile Oid dst_deftablespace;
82         Relation        pg_database_rel;
83         HeapTuple       tuple;
84         Datum           new_record[Natts_pg_database];
85         char            new_record_nulls[Natts_pg_database];
86         Oid                     dboid;
87         Oid                     datdba;
88         ListCell   *option;
89         DefElem    *dtablespacename = NULL;
90         DefElem    *downer = NULL;
91         DefElem    *dtemplate = NULL;
92         DefElem    *dencoding = NULL;
93         DefElem    *dconnlimit = NULL;
94         char       *dbname = stmt->dbname;
95         char       *dbowner = NULL;
96         const char *dbtemplate = NULL;
97         int                     encoding = -1;
98         int                     dbconnlimit = -1;
99
100         /* don't call this in a transaction block */
101         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
102
103         /* Extract options from the statement node tree */
104         foreach(option, stmt->options)
105         {
106                 DefElem    *defel = (DefElem *) lfirst(option);
107
108                 if (strcmp(defel->defname, "tablespace") == 0)
109                 {
110                         if (dtablespacename)
111                                 ereport(ERROR,
112                                                 (errcode(ERRCODE_SYNTAX_ERROR),
113                                                  errmsg("conflicting or redundant options")));
114                         dtablespacename = defel;
115                 }
116                 else if (strcmp(defel->defname, "owner") == 0)
117                 {
118                         if (downer)
119                                 ereport(ERROR,
120                                                 (errcode(ERRCODE_SYNTAX_ERROR),
121                                                  errmsg("conflicting or redundant options")));
122                         downer = defel;
123                 }
124                 else if (strcmp(defel->defname, "template") == 0)
125                 {
126                         if (dtemplate)
127                                 ereport(ERROR,
128                                                 (errcode(ERRCODE_SYNTAX_ERROR),
129                                                  errmsg("conflicting or redundant options")));
130                         dtemplate = defel;
131                 }
132                 else if (strcmp(defel->defname, "encoding") == 0)
133                 {
134                         if (dencoding)
135                                 ereport(ERROR,
136                                                 (errcode(ERRCODE_SYNTAX_ERROR),
137                                                  errmsg("conflicting or redundant options")));
138                         dencoding = defel;
139                 }
140                 else if (strcmp(defel->defname, "connectionlimit") == 0)
141                 {
142                         if (dconnlimit)
143                                 ereport(ERROR,
144                                                 (errcode(ERRCODE_SYNTAX_ERROR),
145                                                  errmsg("conflicting or redundant options")));
146                         dconnlimit = defel;
147                 }
148                 else if (strcmp(defel->defname, "location") == 0)
149                 {
150                         ereport(WARNING,
151                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
152                                          errmsg("LOCATION is not supported anymore"),
153                                          errhint("Consider using tablespaces instead.")));
154                 }
155                 else
156                         elog(ERROR, "option \"%s\" not recognized",
157                                  defel->defname);
158         }
159
160         if (downer && downer->arg)
161                 dbowner = strVal(downer->arg);
162         if (dtemplate && dtemplate->arg)
163                 dbtemplate = strVal(dtemplate->arg);
164         if (dencoding && dencoding->arg)
165         {
166                 const char *encoding_name;
167
168                 if (IsA(dencoding->arg, Integer))
169                 {
170                         encoding = intVal(dencoding->arg);
171                         encoding_name = pg_encoding_to_char(encoding);
172                         if (strcmp(encoding_name, "") == 0 ||
173                                 pg_valid_server_encoding(encoding_name) < 0)
174                                 ereport(ERROR,
175                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
176                                                  errmsg("%d is not a valid encoding code",
177                                                                 encoding)));
178                 }
179                 else if (IsA(dencoding->arg, String))
180                 {
181                         encoding_name = strVal(dencoding->arg);
182                         if (pg_valid_server_encoding(encoding_name) < 0)
183                                 ereport(ERROR,
184                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
185                                                  errmsg("%s is not a valid encoding name",
186                                                                 encoding_name)));
187                         encoding = pg_char_to_encoding(encoding_name);
188                 }
189                 else
190                         elog(ERROR, "unrecognized node type: %d",
191                                  nodeTag(dencoding->arg));
192         }
193         if (dconnlimit && dconnlimit->arg)
194                 dbconnlimit = intVal(dconnlimit->arg);
195
196         /* obtain OID of proposed owner */
197         if (dbowner)
198                 datdba = get_roleid_checked(dbowner);
199         else
200                 datdba = GetUserId();
201
202         /*
203          * To create a database, must have createdb privilege and must be able to
204          * become the target role (this does not imply that the target role itself
205          * must have createdb privilege).  The latter provision guards against
206          * "giveaway" attacks.  Note that a superuser will always have both of
207          * these privileges a fortiori.
208          */
209         if (!have_createdb_privilege())
210                 ereport(ERROR,
211                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
212                                  errmsg("permission denied to create database")));
213
214         check_is_member_of_role(GetUserId(), datdba);
215
216         /*
217          * Lookup database (template) to be cloned, and obtain share lock on it.
218          * ShareLock allows two CREATE DATABASEs to work from the same template
219          * concurrently, while ensuring no one is busy dropping it in parallel
220          * (which would be Very Bad since we'd likely get an incomplete copy
221          * without knowing it).  This also prevents any new connections from being
222          * made to the source until we finish copying it, so we can be sure it
223          * won't change underneath us.
224          */
225         if (!dbtemplate)
226                 dbtemplate = "template1";               /* Default template database name */
227
228         if (!get_db_info(dbtemplate, ShareLock,
229                                          &src_dboid, &src_owner, &src_encoding,
230                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
231                                          &src_frozenxid, &src_deftablespace))
232                 ereport(ERROR,
233                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
234                                  errmsg("template database \"%s\" does not exist",
235                                                 dbtemplate)));
236
237         /*
238          * Permission check: to copy a DB that's not marked datistemplate, you
239          * must be superuser or the owner thereof.
240          */
241         if (!src_istemplate)
242         {
243                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
244                         ereport(ERROR,
245                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
246                                          errmsg("permission denied to copy database \"%s\"",
247                                                         dbtemplate)));
248         }
249
250         /*
251          * The source DB can't have any active backends, except this one
252          * (exception is to allow CREATE DB while connected to template1).
253          * Otherwise we might copy inconsistent data.
254          */
255         if (DatabaseCancelAutovacuumActivity(src_dboid, true))
256                 ereport(ERROR,
257                                 (errcode(ERRCODE_OBJECT_IN_USE),
258                                  errmsg("source database \"%s\" is being accessed by other users",
259                                                 dbtemplate)));
260
261         /* If encoding is defaulted, use source's encoding */
262         if (encoding < 0)
263                 encoding = src_encoding;
264
265         /* Some encodings are client only */
266         if (!PG_VALID_BE_ENCODING(encoding))
267                 ereport(ERROR,
268                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
269                                  errmsg("invalid server encoding %d", encoding)));
270
271         /* Resolve default tablespace for new database */
272         if (dtablespacename && dtablespacename->arg)
273         {
274                 char       *tablespacename;
275                 AclResult       aclresult;
276
277                 tablespacename = strVal(dtablespacename->arg);
278                 dst_deftablespace = get_tablespace_oid(tablespacename);
279                 if (!OidIsValid(dst_deftablespace))
280                         ereport(ERROR,
281                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
282                                          errmsg("tablespace \"%s\" does not exist",
283                                                         tablespacename)));
284                 /* check permissions */
285                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
286                                                                                    ACL_CREATE);
287                 if (aclresult != ACLCHECK_OK)
288                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
289                                                    tablespacename);
290
291                 /*
292                  * If we are trying to change the default tablespace of the template,
293                  * we require that the template not have any files in the new default
294                  * tablespace.  This is necessary because otherwise the copied
295                  * database would contain pg_class rows that refer to its default
296                  * tablespace both explicitly (by OID) and implicitly (as zero), which
297                  * would cause problems.  For example another CREATE DATABASE using
298                  * the copied database as template, and trying to change its default
299                  * tablespace again, would yield outright incorrect results (it would
300                  * improperly move tables to the new default tablespace that should
301                  * stay in the same tablespace).
302                  */
303                 if (dst_deftablespace != src_deftablespace)
304                 {
305                         char       *srcpath;
306                         struct stat st;
307
308                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
309
310                         if (stat(srcpath, &st) == 0 &&
311                                 S_ISDIR(st.st_mode) &&
312                                 !directory_is_empty(srcpath))
313                                 ereport(ERROR,
314                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
315                                                  errmsg("cannot assign new default tablespace \"%s\"",
316                                                                 tablespacename),
317                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
318                                                                    dbtemplate)));
319                         pfree(srcpath);
320                 }
321         }
322         else
323         {
324                 /* Use template database's default tablespace */
325                 dst_deftablespace = src_deftablespace;
326                 /* Note there is no additional permission check in this path */
327         }
328
329         /*
330          * Check for db name conflict.  This is just to give a more friendly error
331          * message than "unique index violation".  There's a race condition but
332          * we're willing to accept the less friendly message in that case.
333          */
334         if (OidIsValid(get_database_oid(dbname)))
335                 ereport(ERROR,
336                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
337                                  errmsg("database \"%s\" already exists", dbname)));
338
339         /*
340          * Select an OID for the new database, checking that it doesn't have
341          * a filename conflict with anything already existing in the tablespace
342          * directories.
343          */
344         pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
345
346         do
347         {
348                 dboid = GetNewOid(pg_database_rel);
349         } while (check_db_file_conflict(dboid));
350
351         /*
352          * Insert a new tuple into pg_database.  This establishes our ownership of
353          * the new database name (anyone else trying to insert the same name will
354          * block on the unique index, and fail after we commit).
355          */
356
357         /* Form tuple */
358         MemSet(new_record, 0, sizeof(new_record));
359         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
360
361         new_record[Anum_pg_database_datname - 1] =
362                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
363         new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
364         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
365         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
366         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
367         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
368         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
369         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
370         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
371
372         /*
373          * We deliberately set datconfig and datacl to defaults (NULL), rather
374          * than copying them from the template database.  Copying datacl would be
375          * a bad idea when the owner is not the same as the template's owner. It's
376          * more debatable whether datconfig should be copied.
377          */
378         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
379         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
380
381         tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
382                                                    new_record, new_record_nulls);
383
384         HeapTupleSetOid(tuple, dboid);
385
386         simple_heap_insert(pg_database_rel, tuple);
387
388         /* Update indexes */
389         CatalogUpdateIndexes(pg_database_rel, tuple);
390
391         /*
392          * Now generate additional catalog entries associated with the new DB
393          */
394
395         /* Register owner dependency */
396         recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
397
398         /* Create pg_shdepend entries for objects within database */
399         copyTemplateDependencies(src_dboid, dboid);
400
401         /*
402          * Force dirty buffers out to disk, to ensure source database is
403          * up-to-date for the copy.  (We really only need to flush buffers for the
404          * source database, but bufmgr.c provides no API for that.)
405          */
406         BufferSync();
407
408         /*
409          * Once we start copying subdirectories, we need to be able to clean 'em
410          * up if we fail.  Establish a TRY block to make sure this happens. (This
411          * is not a 100% solution, because of the possibility of failure during
412          * transaction commit after we leave this routine, but it should handle
413          * most scenarios.)
414          */
415         PG_TRY();
416         {
417                 /*
418                  * Iterate through all tablespaces of the template database, and copy
419                  * each one to the new database.
420                  */
421                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
422                 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
423                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
424                 {
425                         Oid                     srctablespace = HeapTupleGetOid(tuple);
426                         Oid                     dsttablespace;
427                         char       *srcpath;
428                         char       *dstpath;
429                         struct stat st;
430
431                         /* No need to copy global tablespace */
432                         if (srctablespace == GLOBALTABLESPACE_OID)
433                                 continue;
434
435                         srcpath = GetDatabasePath(src_dboid, srctablespace);
436
437                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
438                                 directory_is_empty(srcpath))
439                         {
440                                 /* Assume we can ignore it */
441                                 pfree(srcpath);
442                                 continue;
443                         }
444
445                         if (srctablespace == src_deftablespace)
446                                 dsttablespace = dst_deftablespace;
447                         else
448                                 dsttablespace = srctablespace;
449
450                         dstpath = GetDatabasePath(dboid, dsttablespace);
451
452                         /*
453                          * Copy this subdirectory to the new location
454                          *
455                          * We don't need to copy subdirectories
456                          */
457                         copydir(srcpath, dstpath, false);
458
459                         /* Record the filesystem change in XLOG */
460                         {
461                                 xl_dbase_create_rec xlrec;
462                                 XLogRecData rdata[1];
463
464                                 xlrec.db_id = dboid;
465                                 xlrec.tablespace_id = dsttablespace;
466                                 xlrec.src_db_id = src_dboid;
467                                 xlrec.src_tablespace_id = srctablespace;
468
469                                 rdata[0].data = (char *) &xlrec;
470                                 rdata[0].len = sizeof(xl_dbase_create_rec);
471                                 rdata[0].buffer = InvalidBuffer;
472                                 rdata[0].next = NULL;
473
474                                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
475                         }
476                 }
477                 heap_endscan(scan);
478                 heap_close(rel, AccessShareLock);
479
480                 /*
481                  * We force a checkpoint before committing.  This effectively means
482                  * that committed XLOG_DBASE_CREATE operations will never need to be
483                  * replayed (at least not in ordinary crash recovery; we still have to
484                  * make the XLOG entry for the benefit of PITR operations). This
485                  * avoids two nasty scenarios:
486                  *
487                  * #1: When PITR is off, we don't XLOG the contents of newly created
488                  * indexes; therefore the drop-and-recreate-whole-directory behavior
489                  * of DBASE_CREATE replay would lose such indexes.
490                  *
491                  * #2: Since we have to recopy the source database during DBASE_CREATE
492                  * replay, we run the risk of copying changes in it that were
493                  * committed after the original CREATE DATABASE command but before the
494                  * system crash that led to the replay.  This is at least unexpected
495                  * and at worst could lead to inconsistencies, eg duplicate table
496                  * names.
497                  *
498                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
499                  *
500                  * In PITR replay, the first of these isn't an issue, and the second
501                  * is only a risk if the CREATE DATABASE and subsequent template
502                  * database change both occur while a base backup is being taken.
503                  * There doesn't seem to be much we can do about that except document
504                  * it as a limitation.
505                  *
506                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
507                  * we can avoid this.
508                  */
509                 RequestCheckpoint(true, false);
510
511                 /*
512                  * Close pg_database, but keep lock till commit (this is important to
513                  * prevent any risk of deadlock failure while updating flat file)
514                  */
515                 heap_close(pg_database_rel, NoLock);
516
517                 /*
518                  * Set flag to update flat database file at commit.
519                  */
520                 database_file_update_needed();
521         }
522         PG_CATCH();
523         {
524                 /* Release lock on source database before doing recursive remove */
525                 UnlockSharedObject(DatabaseRelationId, src_dboid, 0,
526                                                    ShareLock);
527
528                 /* Throw away any successfully copied subdirectories */
529                 remove_dbtablespaces(dboid);
530
531                 PG_RE_THROW();
532         }
533         PG_END_TRY();
534 }
535
536
537 /*
538  * DROP DATABASE
539  */
540 void
541 dropdb(const char *dbname, bool missing_ok)
542 {
543         Oid                     db_id;
544         bool            db_istemplate;
545         Relation        pgdbrel;
546         HeapTuple       tup;
547
548         PreventTransactionChain((void *) dbname, "DROP DATABASE");
549
550         AssertArg(dbname);
551
552         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
553                 ereport(ERROR,
554                                 (errcode(ERRCODE_OBJECT_IN_USE),
555                                  errmsg("cannot drop the currently open database")));
556
557         /*
558          * Look up the target database's OID, and get exclusive lock on it. We
559          * need this to ensure that no new backend starts up in the target
560          * database while we are deleting it (see postinit.c), and that no one is
561          * using it as a CREATE DATABASE template or trying to delete it for
562          * themselves.
563          */
564         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
565
566         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
567                                          &db_istemplate, NULL, NULL, NULL, NULL))
568         {
569                 if (!missing_ok)
570                 {
571                         ereport(ERROR,
572                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
573                                          errmsg("database \"%s\" does not exist", dbname)));
574                 }
575                 else
576                 {
577                         /* Close pg_database, release the lock, since we changed nothing */
578                         heap_close(pgdbrel, RowExclusiveLock);
579                         ereport(NOTICE,
580                                         (errmsg("database \"%s\" does not exist, skipping",
581                                                         dbname)));
582                         return;
583                 }
584         }
585
586         /*
587          * Permission checks
588          */
589         if (!pg_database_ownercheck(db_id, GetUserId()))
590                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
591                                            dbname);
592
593         /*
594          * Disallow dropping a DB that is marked istemplate.  This is just to
595          * prevent people from accidentally dropping template0 or template1; they
596          * can do so if they're really determined ...
597          */
598         if (db_istemplate)
599                 ereport(ERROR,
600                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
601                                  errmsg("cannot drop a template database")));
602
603         /*
604          * Check for active backends in the target database.  (Because we hold the
605          * database lock, no new ones can start after this.)
606          */
607         if (DatabaseCancelAutovacuumActivity(db_id, false))
608                 ereport(ERROR,
609                                 (errcode(ERRCODE_OBJECT_IN_USE),
610                                  errmsg("database \"%s\" is being accessed by other users",
611                                                 dbname)));
612
613         /*
614          * Remove the database's tuple from pg_database.
615          */
616         tup = SearchSysCache(DATABASEOID,
617                                                  ObjectIdGetDatum(db_id),
618                                                  0, 0, 0);
619         if (!HeapTupleIsValid(tup))
620                 elog(ERROR, "cache lookup failed for database %u", db_id);
621
622         simple_heap_delete(pgdbrel, &tup->t_self);
623
624         ReleaseSysCache(tup);
625
626         /*
627          * Delete any comments associated with the database.
628          */
629         DeleteSharedComments(db_id, DatabaseRelationId);
630
631         /*
632          * Remove shared dependency references for the database.
633          */
634         dropDatabaseDependencies(db_id);
635
636         /*
637          * Drop pages for this database that are in the shared buffer cache. This
638          * is important to ensure that no remaining backend tries to write out a
639          * dirty buffer to the dead database later...
640          */
641         DropDatabaseBuffers(db_id);
642
643         /*
644          * Also, clean out any entries in the shared free space map.
645          */
646         FreeSpaceMapForgetDatabase(db_id);
647
648         /*
649          * Tell the stats collector to forget it immediately, too.
650          */
651         pgstat_drop_database(db_id);
652
653         /*
654          * Tell bgwriter to forget any pending fsync requests for files in the
655          * database; else it'll fail at next checkpoint.
656          */
657         ForgetDatabaseFsyncRequests(db_id);
658
659         /*
660          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
661          * open files, which would cause rmdir() to fail.
662          */
663 #ifdef WIN32
664         RequestCheckpoint(true, false);
665 #endif
666
667         /*
668          * Remove all tablespace subdirs belonging to the database.
669          */
670         remove_dbtablespaces(db_id);
671
672         /*
673          * Close pg_database, but keep lock till commit (this is important to
674          * prevent any risk of deadlock failure while updating flat file)
675          */
676         heap_close(pgdbrel, NoLock);
677
678         /*
679          * Set flag to update flat database file at commit.
680          */
681         database_file_update_needed();
682 }
683
684
685 /*
686  * Rename database
687  */
688 void
689 RenameDatabase(const char *oldname, const char *newname)
690 {
691         Oid                     db_id;
692         HeapTuple       newtup;
693         Relation        rel;
694
695         /*
696          * Look up the target database's OID, and get exclusive lock on it. We
697          * need this for the same reasons as DROP DATABASE.
698          */
699         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
700
701         if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
702                                          NULL, NULL, NULL, NULL, NULL))
703                 ereport(ERROR,
704                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
705                                  errmsg("database \"%s\" does not exist", oldname)));
706
707         /*
708          * XXX Client applications probably store the current database somewhere,
709          * so renaming it could cause confusion.  On the other hand, there may not
710          * be an actual problem besides a little confusion, so think about this
711          * and decide.
712          */
713         if (db_id == MyDatabaseId)
714                 ereport(ERROR,
715                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
716                                  errmsg("current database cannot be renamed")));
717
718         /*
719          * Make sure the database does not have active sessions.  This is the same
720          * concern as above, but applied to other sessions.
721          */
722         if (DatabaseCancelAutovacuumActivity(db_id, false))
723                 ereport(ERROR,
724                                 (errcode(ERRCODE_OBJECT_IN_USE),
725                                  errmsg("database \"%s\" is being accessed by other users",
726                                                 oldname)));
727
728         /* make sure the new name doesn't exist */
729         if (OidIsValid(get_database_oid(newname)))
730                 ereport(ERROR,
731                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
732                                  errmsg("database \"%s\" already exists", newname)));
733
734         /* must be owner */
735         if (!pg_database_ownercheck(db_id, GetUserId()))
736                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
737                                            oldname);
738
739         /* must have createdb rights */
740         if (!have_createdb_privilege())
741                 ereport(ERROR,
742                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
743                                  errmsg("permission denied to rename database")));
744
745         /* rename */
746         newtup = SearchSysCacheCopy(DATABASEOID,
747                                                                 ObjectIdGetDatum(db_id),
748                                                                 0, 0, 0);
749         if (!HeapTupleIsValid(newtup))
750                 elog(ERROR, "cache lookup failed for database %u", db_id);
751         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
752         simple_heap_update(rel, &newtup->t_self, newtup);
753         CatalogUpdateIndexes(rel, newtup);
754
755         /*
756          * Close pg_database, but keep lock till commit (this is important to
757          * prevent any risk of deadlock failure while updating flat file)
758          */
759         heap_close(rel, NoLock);
760
761         /*
762          * Set flag to update flat database file at commit.
763          */
764         database_file_update_needed();
765 }
766
767
768 /*
769  * ALTER DATABASE name ...
770  */
771 void
772 AlterDatabase(AlterDatabaseStmt *stmt)
773 {
774         Relation        rel;
775         HeapTuple       tuple,
776                                 newtuple;
777         ScanKeyData scankey;
778         SysScanDesc scan;
779         ListCell   *option;
780         int                     connlimit = -1;
781         DefElem    *dconnlimit = NULL;
782         Datum           new_record[Natts_pg_database];
783         char            new_record_nulls[Natts_pg_database];
784         char            new_record_repl[Natts_pg_database];
785
786         /* Extract options from the statement node tree */
787         foreach(option, stmt->options)
788         {
789                 DefElem    *defel = (DefElem *) lfirst(option);
790
791                 if (strcmp(defel->defname, "connectionlimit") == 0)
792                 {
793                         if (dconnlimit)
794                                 ereport(ERROR,
795                                                 (errcode(ERRCODE_SYNTAX_ERROR),
796                                                  errmsg("conflicting or redundant options")));
797                         dconnlimit = defel;
798                 }
799                 else
800                         elog(ERROR, "option \"%s\" not recognized",
801                                  defel->defname);
802         }
803
804         if (dconnlimit)
805                 connlimit = intVal(dconnlimit->arg);
806
807         /*
808          * Get the old tuple.  We don't need a lock on the database per se,
809          * because we're not going to do anything that would mess up incoming
810          * connections.
811          */
812         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
813         ScanKeyInit(&scankey,
814                                 Anum_pg_database_datname,
815                                 BTEqualStrategyNumber, F_NAMEEQ,
816                                 NameGetDatum(stmt->dbname));
817         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
818                                                           SnapshotNow, 1, &scankey);
819         tuple = systable_getnext(scan);
820         if (!HeapTupleIsValid(tuple))
821                 ereport(ERROR,
822                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
823                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
824
825         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
826                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
827                                            stmt->dbname);
828
829         /*
830          * Build an updated tuple, perusing the information just obtained
831          */
832         MemSet(new_record, 0, sizeof(new_record));
833         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
834         MemSet(new_record_repl, ' ', sizeof(new_record_repl));
835
836         if (dconnlimit)
837         {
838                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
839                 new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
840         }
841
842         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
843                                                                 new_record_nulls, new_record_repl);
844         simple_heap_update(rel, &tuple->t_self, newtuple);
845
846         /* Update indexes */
847         CatalogUpdateIndexes(rel, newtuple);
848
849         systable_endscan(scan);
850
851         /* Close pg_database, but keep lock till commit */
852         heap_close(rel, NoLock);
853
854         /*
855          * We don't bother updating the flat file since the existing options for
856          * ALTER DATABASE don't affect it.
857          */
858 }
859
860
861 /*
862  * ALTER DATABASE name SET ...
863  */
864 void
865 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
866 {
867         char       *valuestr;
868         HeapTuple       tuple,
869                                 newtuple;
870         Relation        rel;
871         ScanKeyData scankey;
872         SysScanDesc scan;
873         Datum           repl_val[Natts_pg_database];
874         char            repl_null[Natts_pg_database];
875         char            repl_repl[Natts_pg_database];
876
877         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
878
879         /*
880          * Get the old tuple.  We don't need a lock on the database per se,
881          * because we're not going to do anything that would mess up incoming
882          * connections.
883          */
884         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
885         ScanKeyInit(&scankey,
886                                 Anum_pg_database_datname,
887                                 BTEqualStrategyNumber, F_NAMEEQ,
888                                 NameGetDatum(stmt->dbname));
889         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
890                                                           SnapshotNow, 1, &scankey);
891         tuple = systable_getnext(scan);
892         if (!HeapTupleIsValid(tuple))
893                 ereport(ERROR,
894                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
895                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
896
897         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
898                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
899                                            stmt->dbname);
900
901         MemSet(repl_repl, ' ', sizeof(repl_repl));
902         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
903
904         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
905         {
906                 /* RESET ALL */
907                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
908                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
909         }
910         else
911         {
912                 Datum           datum;
913                 bool            isnull;
914                 ArrayType  *a;
915
916                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
917
918                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
919                                                          RelationGetDescr(rel), &isnull);
920
921                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
922
923                 if (valuestr)
924                         a = GUCArrayAdd(a, stmt->variable, valuestr);
925                 else
926                         a = GUCArrayDelete(a, stmt->variable);
927
928                 if (a)
929                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
930                 else
931                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
932         }
933
934         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
935         simple_heap_update(rel, &tuple->t_self, newtuple);
936
937         /* Update indexes */
938         CatalogUpdateIndexes(rel, newtuple);
939
940         systable_endscan(scan);
941
942         /* Close pg_database, but keep lock till commit */
943         heap_close(rel, NoLock);
944
945         /*
946          * We don't bother updating the flat file since ALTER DATABASE SET doesn't
947          * affect it.
948          */
949 }
950
951
952 /*
953  * ALTER DATABASE name OWNER TO newowner
954  */
955 void
956 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
957 {
958         HeapTuple       tuple;
959         Relation        rel;
960         ScanKeyData scankey;
961         SysScanDesc scan;
962         Form_pg_database datForm;
963
964         /*
965          * Get the old tuple.  We don't need a lock on the database per se,
966          * because we're not going to do anything that would mess up incoming
967          * connections.
968          */
969         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
970         ScanKeyInit(&scankey,
971                                 Anum_pg_database_datname,
972                                 BTEqualStrategyNumber, F_NAMEEQ,
973                                 NameGetDatum(dbname));
974         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
975                                                           SnapshotNow, 1, &scankey);
976         tuple = systable_getnext(scan);
977         if (!HeapTupleIsValid(tuple))
978                 ereport(ERROR,
979                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
980                                  errmsg("database \"%s\" does not exist", dbname)));
981
982         datForm = (Form_pg_database) GETSTRUCT(tuple);
983
984         /*
985          * If the new owner is the same as the existing owner, consider the
986          * command to have succeeded.  This is to be consistent with other
987          * objects.
988          */
989         if (datForm->datdba != newOwnerId)
990         {
991                 Datum           repl_val[Natts_pg_database];
992                 char            repl_null[Natts_pg_database];
993                 char            repl_repl[Natts_pg_database];
994                 Acl                *newAcl;
995                 Datum           aclDatum;
996                 bool            isNull;
997                 HeapTuple       newtuple;
998
999                 /* Otherwise, must be owner of the existing object */
1000                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1001                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1002                                                    dbname);
1003
1004                 /* Must be able to become new owner */
1005                 check_is_member_of_role(GetUserId(), newOwnerId);
1006
1007                 /*
1008                  * must have createdb rights
1009                  *
1010                  * NOTE: This is different from other alter-owner checks in that the
1011                  * current user is checked for createdb privileges instead of the
1012                  * destination owner.  This is consistent with the CREATE case for
1013                  * databases.  Because superusers will always have this right, we need
1014                  * no special case for them.
1015                  */
1016                 if (!have_createdb_privilege())
1017                         ereport(ERROR,
1018                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1019                                    errmsg("permission denied to change owner of database")));
1020
1021                 memset(repl_null, ' ', sizeof(repl_null));
1022                 memset(repl_repl, ' ', sizeof(repl_repl));
1023
1024                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
1025                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1026
1027                 /*
1028                  * Determine the modified ACL for the new owner.  This is only
1029                  * necessary when the ACL is non-null.
1030                  */
1031                 aclDatum = heap_getattr(tuple,
1032                                                                 Anum_pg_database_datacl,
1033                                                                 RelationGetDescr(rel),
1034                                                                 &isNull);
1035                 if (!isNull)
1036                 {
1037                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1038                                                                  datForm->datdba, newOwnerId);
1039                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
1040                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1041                 }
1042
1043                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1044                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1045                 CatalogUpdateIndexes(rel, newtuple);
1046
1047                 heap_freetuple(newtuple);
1048
1049                 /* Update owner dependency reference */
1050                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1051                                                                 newOwnerId);
1052         }
1053
1054         systable_endscan(scan);
1055
1056         /* Close pg_database, but keep lock till commit */
1057         heap_close(rel, NoLock);
1058
1059         /*
1060          * We don't bother updating the flat file since ALTER DATABASE OWNER
1061          * doesn't affect it.
1062          */
1063 }
1064
1065
1066 /*
1067  * Helper functions
1068  */
1069
1070 /*
1071  * Look up info about the database named "name".  If the database exists,
1072  * obtain the specified lock type on it, fill in any of the remaining
1073  * parameters that aren't NULL, and return TRUE.  If no such database,
1074  * return FALSE.
1075  */
1076 static bool
1077 get_db_info(const char *name, LOCKMODE lockmode,
1078                         Oid *dbIdP, Oid *ownerIdP,
1079                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1080                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1081                         Oid *dbTablespace)
1082 {
1083         bool            result = false;
1084         Relation        relation;
1085
1086         AssertArg(name);
1087
1088         /* Caller may wish to grab a better lock on pg_database beforehand... */
1089         relation = heap_open(DatabaseRelationId, AccessShareLock);
1090
1091         /*
1092          * Loop covers the rare case where the database is renamed before we can
1093          * lock it.  We try again just in case we can find a new one of the same
1094          * name.
1095          */
1096         for (;;)
1097         {
1098                 ScanKeyData scanKey;
1099                 SysScanDesc scan;
1100                 HeapTuple       tuple;
1101                 Oid                     dbOid;
1102
1103                 /*
1104                  * there's no syscache for database-indexed-by-name, so must do it the
1105                  * hard way
1106                  */
1107                 ScanKeyInit(&scanKey,
1108                                         Anum_pg_database_datname,
1109                                         BTEqualStrategyNumber, F_NAMEEQ,
1110                                         NameGetDatum(name));
1111
1112                 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1113                                                                   SnapshotNow, 1, &scanKey);
1114
1115                 tuple = systable_getnext(scan);
1116
1117                 if (!HeapTupleIsValid(tuple))
1118                 {
1119                         /* definitely no database of that name */
1120                         systable_endscan(scan);
1121                         break;
1122                 }
1123
1124                 dbOid = HeapTupleGetOid(tuple);
1125
1126                 systable_endscan(scan);
1127
1128                 /*
1129                  * Now that we have a database OID, we can try to lock the DB.
1130                  */
1131                 if (lockmode != NoLock)
1132                         LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1133
1134                 /*
1135                  * And now, re-fetch the tuple by OID.  If it's still there and still
1136                  * the same name, we win; else, drop the lock and loop back to try
1137                  * again.
1138                  */
1139                 tuple = SearchSysCache(DATABASEOID,
1140                                                            ObjectIdGetDatum(dbOid),
1141                                                            0, 0, 0);
1142                 if (HeapTupleIsValid(tuple))
1143                 {
1144                         Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1145
1146                         if (strcmp(name, NameStr(dbform->datname)) == 0)
1147                         {
1148                                 /* oid of the database */
1149                                 if (dbIdP)
1150                                         *dbIdP = dbOid;
1151                                 /* oid of the owner */
1152                                 if (ownerIdP)
1153                                         *ownerIdP = dbform->datdba;
1154                                 /* character encoding */
1155                                 if (encodingP)
1156                                         *encodingP = dbform->encoding;
1157                                 /* allowed as template? */
1158                                 if (dbIsTemplateP)
1159                                         *dbIsTemplateP = dbform->datistemplate;
1160                                 /* allowing connections? */
1161                                 if (dbAllowConnP)
1162                                         *dbAllowConnP = dbform->datallowconn;
1163                                 /* last system OID used in database */
1164                                 if (dbLastSysOidP)
1165                                         *dbLastSysOidP = dbform->datlastsysoid;
1166                                 /* limit of frozen XIDs */
1167                                 if (dbFrozenXidP)
1168                                         *dbFrozenXidP = dbform->datfrozenxid;
1169                                 /* default tablespace for this database */
1170                                 if (dbTablespace)
1171                                         *dbTablespace = dbform->dattablespace;
1172                                 ReleaseSysCache(tuple);
1173                                 result = true;
1174                                 break;
1175                         }
1176                         /* can only get here if it was just renamed */
1177                         ReleaseSysCache(tuple);
1178                 }
1179
1180                 if (lockmode != NoLock)
1181                         UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1182         }
1183
1184         heap_close(relation, AccessShareLock);
1185
1186         return result;
1187 }
1188
1189 /* Check if current user has createdb privileges */
1190 static bool
1191 have_createdb_privilege(void)
1192 {
1193         bool            result = false;
1194         HeapTuple       utup;
1195
1196         /* Superusers can always do everything */
1197         if (superuser())
1198                 return true;
1199
1200         utup = SearchSysCache(AUTHOID,
1201                                                   ObjectIdGetDatum(GetUserId()),
1202                                                   0, 0, 0);
1203         if (HeapTupleIsValid(utup))
1204         {
1205                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1206                 ReleaseSysCache(utup);
1207         }
1208         return result;
1209 }
1210
1211 /*
1212  * Remove tablespace directories
1213  *
1214  * We don't know what tablespaces db_id is using, so iterate through all
1215  * tablespaces removing <tablespace>/db_id
1216  */
1217 static void
1218 remove_dbtablespaces(Oid db_id)
1219 {
1220         Relation        rel;
1221         HeapScanDesc scan;
1222         HeapTuple       tuple;
1223
1224         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1225         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1226         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1227         {
1228                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1229                 char       *dstpath;
1230                 struct stat st;
1231
1232                 /* Don't mess with the global tablespace */
1233                 if (dsttablespace == GLOBALTABLESPACE_OID)
1234                         continue;
1235
1236                 dstpath = GetDatabasePath(db_id, dsttablespace);
1237
1238                 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1239                 {
1240                         /* Assume we can ignore it */
1241                         pfree(dstpath);
1242                         continue;
1243                 }
1244
1245                 if (!rmtree(dstpath, true))
1246                         ereport(WARNING,
1247                                         (errmsg("could not remove database directory \"%s\"",
1248                                                         dstpath)));
1249
1250                 /* Record the filesystem change in XLOG */
1251                 {
1252                         xl_dbase_drop_rec xlrec;
1253                         XLogRecData rdata[1];
1254
1255                         xlrec.db_id = db_id;
1256                         xlrec.tablespace_id = dsttablespace;
1257
1258                         rdata[0].data = (char *) &xlrec;
1259                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1260                         rdata[0].buffer = InvalidBuffer;
1261                         rdata[0].next = NULL;
1262
1263                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1264                 }
1265
1266                 pfree(dstpath);
1267         }
1268
1269         heap_endscan(scan);
1270         heap_close(rel, AccessShareLock);
1271 }
1272
1273 /*
1274  * Check for existing files that conflict with a proposed new DB OID;
1275  * return TRUE if there are any
1276  *
1277  * If there were a subdirectory in any tablespace matching the proposed new
1278  * OID, we'd get a create failure due to the duplicate name ... and then we'd
1279  * try to remove that already-existing subdirectory during the cleanup in
1280  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
1281  * instead we make this extra check before settling on the OID of the new
1282  * database.  This exactly parallels what GetNewRelFileNode() does for table
1283  * relfilenode values.
1284  */
1285 static bool
1286 check_db_file_conflict(Oid db_id)
1287 {
1288         bool            result = false;
1289         Relation        rel;
1290         HeapScanDesc scan;
1291         HeapTuple       tuple;
1292
1293         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1294         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1295         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1296         {
1297                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1298                 char       *dstpath;
1299                 struct stat st;
1300
1301                 /* Don't mess with the global tablespace */
1302                 if (dsttablespace == GLOBALTABLESPACE_OID)
1303                         continue;
1304
1305                 dstpath = GetDatabasePath(db_id, dsttablespace);
1306
1307                 if (lstat(dstpath, &st) == 0)
1308                 {
1309                         /* Found a conflicting file (or directory, whatever) */
1310                         pfree(dstpath);
1311                         result = true;
1312                         break;
1313                 }
1314
1315                 pfree(dstpath);
1316         }
1317
1318         heap_endscan(scan);
1319         heap_close(rel, AccessShareLock);
1320         return result;
1321 }
1322
1323 /*
1324  * get_database_oid - given a database name, look up the OID
1325  *
1326  * Returns InvalidOid if database name not found.
1327  */
1328 Oid
1329 get_database_oid(const char *dbname)
1330 {
1331         Relation        pg_database;
1332         ScanKeyData entry[1];
1333         SysScanDesc scan;
1334         HeapTuple       dbtuple;
1335         Oid                     oid;
1336
1337         /*
1338          * There's no syscache for pg_database indexed by name, so we must look
1339          * the hard way.
1340          */
1341         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1342         ScanKeyInit(&entry[0],
1343                                 Anum_pg_database_datname,
1344                                 BTEqualStrategyNumber, F_NAMEEQ,
1345                                 CStringGetDatum(dbname));
1346         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1347                                                           SnapshotNow, 1, entry);
1348
1349         dbtuple = systable_getnext(scan);
1350
1351         /* We assume that there can be at most one matching tuple */
1352         if (HeapTupleIsValid(dbtuple))
1353                 oid = HeapTupleGetOid(dbtuple);
1354         else
1355                 oid = InvalidOid;
1356
1357         systable_endscan(scan);
1358         heap_close(pg_database, AccessShareLock);
1359
1360         return oid;
1361 }
1362
1363
1364 /*
1365  * get_database_name - given a database OID, look up the name
1366  *
1367  * Returns a palloc'd string, or NULL if no such database.
1368  */
1369 char *
1370 get_database_name(Oid dbid)
1371 {
1372         HeapTuple       dbtuple;
1373         char       *result;
1374
1375         dbtuple = SearchSysCache(DATABASEOID,
1376                                                          ObjectIdGetDatum(dbid),
1377                                                          0, 0, 0);
1378         if (HeapTupleIsValid(dbtuple))
1379         {
1380                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1381                 ReleaseSysCache(dbtuple);
1382         }
1383         else
1384                 result = NULL;
1385
1386         return result;
1387 }
1388
1389 /*
1390  * DATABASE resource manager's routines
1391  */
1392 void
1393 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1394 {
1395         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1396
1397         if (info == XLOG_DBASE_CREATE)
1398         {
1399                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1400                 char       *src_path;
1401                 char       *dst_path;
1402                 struct stat st;
1403
1404                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1405                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1406
1407                 /*
1408                  * Our theory for replaying a CREATE is to forcibly drop the target
1409                  * subdirectory if present, then re-copy the source data. This may be
1410                  * more work than needed, but it is simple to implement.
1411                  */
1412                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1413                 {
1414                         if (!rmtree(dst_path, true))
1415                                 ereport(WARNING,
1416                                                 (errmsg("could not remove database directory \"%s\"",
1417                                                                 dst_path)));
1418                 }
1419
1420                 /*
1421                  * Force dirty buffers out to disk, to ensure source database is
1422                  * up-to-date for the copy.  (We really only need to flush buffers for
1423                  * the source database, but bufmgr.c provides no API for that.)
1424                  */
1425                 BufferSync();
1426
1427                 /*
1428                  * Copy this subdirectory to the new location
1429                  *
1430                  * We don't need to copy subdirectories
1431                  */
1432                 copydir(src_path, dst_path, false);
1433         }
1434         else if (info == XLOG_DBASE_DROP)
1435         {
1436                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1437                 char       *dst_path;
1438
1439                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1440
1441                 /* Drop pages for this database that are in the shared buffer cache */
1442                 DropDatabaseBuffers(xlrec->db_id);
1443
1444                 /* Also, clean out any entries in the shared free space map */
1445                 FreeSpaceMapForgetDatabase(xlrec->db_id);
1446
1447                 /* Clean out the xlog relcache too */
1448                 XLogDropDatabase(xlrec->db_id);
1449
1450                 /* And remove the physical files */
1451                 if (!rmtree(dst_path, true))
1452                         ereport(WARNING,
1453                                         (errmsg("could not remove database directory \"%s\"",
1454                                                         dst_path)));
1455         }
1456         else
1457                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1458 }
1459
1460 void
1461 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
1462 {
1463         uint8           info = xl_info & ~XLR_INFO_MASK;
1464
1465         if (info == XLOG_DBASE_CREATE)
1466         {
1467                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1468
1469                 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
1470                                                  xlrec->src_db_id, xlrec->src_tablespace_id,
1471                                                  xlrec->db_id, xlrec->tablespace_id);
1472         }
1473         else if (info == XLOG_DBASE_DROP)
1474         {
1475                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1476
1477                 appendStringInfo(buf, "drop db: dir %u/%u",
1478                                                  xlrec->db_id, xlrec->tablespace_id);
1479         }
1480         else
1481                 appendStringInfo(buf, "UNKNOWN");
1482 }