]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Use a safer order of operations in dropdb(): rollbackable operations,
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands take ExclusiveLock on
7  * pg_database to ensure that no two proceed in parallel.  We must use
8  * at least this level of locking to ensure that no two backends try to
9  * write the flat-file copy of pg_database at once.  We avoid using
10  * AccessExclusiveLock since there's no need to lock out ordinary readers
11  * of pg_database.
12  *
13  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.172 2005/10/10 20:02:20 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 #include "access/genam.h"
29 #include "access/heapam.h"
30 #include "catalog/catalog.h"
31 #include "catalog/dependency.h"
32 #include "catalog/indexing.h"
33 #include "catalog/pg_authid.h"
34 #include "catalog/pg_database.h"
35 #include "catalog/pg_tablespace.h"
36 #include "commands/comment.h"
37 #include "commands/dbcommands.h"
38 #include "commands/tablespace.h"
39 #include "mb/pg_wchar.h"
40 #include "miscadmin.h"
41 #include "postmaster/bgwriter.h"
42 #include "storage/fd.h"
43 #include "storage/freespace.h"
44 #include "storage/procarray.h"
45 #include "utils/acl.h"
46 #include "utils/array.h"
47 #include "utils/builtins.h"
48 #include "utils/flatfiles.h"
49 #include "utils/fmgroids.h"
50 #include "utils/guc.h"
51 #include "utils/lsyscache.h"
52 #include "utils/syscache.h"
53
54
55 /* non-export function prototypes */
56 static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
57                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
58                         Oid *dbLastSysOidP,
59                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
60                         Oid *dbTablespace);
61 static bool have_createdb_privilege(void);
62 static void remove_dbtablespaces(Oid db_id);
63
64
65 /*
66  * CREATE DATABASE
67  */
68 void
69 createdb(const CreatedbStmt *stmt)
70 {
71         HeapScanDesc scan;
72         Relation        rel;
73         Oid                     src_dboid;
74         Oid                     src_owner;
75         int                     src_encoding;
76         bool            src_istemplate;
77         bool            src_allowconn;
78         Oid                     src_lastsysoid;
79         TransactionId src_vacuumxid;
80         TransactionId src_frozenxid;
81         Oid                     src_deftablespace;
82         volatile Oid dst_deftablespace;
83         volatile Relation pg_database_rel;
84         HeapTuple       tuple;
85         TupleDesc       pg_database_dsc;
86         Datum           new_record[Natts_pg_database];
87         char            new_record_nulls[Natts_pg_database];
88         Oid                     dboid;
89         volatile Oid datdba;
90         ListCell   *option;
91         DefElem    *dtablespacename = NULL;
92         DefElem    *downer = NULL;
93         DefElem    *dtemplate = NULL;
94         DefElem    *dencoding = NULL;
95         DefElem    *dconnlimit = NULL;
96         char       *dbname = stmt->dbname;
97         char       *dbowner = NULL;
98         const char *dbtemplate = NULL;
99         volatile int encoding = -1;
100         volatile int dbconnlimit = -1;
101
102         /* don't call this in a transaction block */
103         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
104
105         /* Extract options from the statement node tree */
106         foreach(option, stmt->options)
107         {
108                 DefElem    *defel = (DefElem *) lfirst(option);
109
110                 if (strcmp(defel->defname, "tablespace") == 0)
111                 {
112                         if (dtablespacename)
113                                 ereport(ERROR,
114                                                 (errcode(ERRCODE_SYNTAX_ERROR),
115                                                  errmsg("conflicting or redundant options")));
116                         dtablespacename = defel;
117                 }
118                 else if (strcmp(defel->defname, "owner") == 0)
119                 {
120                         if (downer)
121                                 ereport(ERROR,
122                                                 (errcode(ERRCODE_SYNTAX_ERROR),
123                                                  errmsg("conflicting or redundant options")));
124                         downer = defel;
125                 }
126                 else if (strcmp(defel->defname, "template") == 0)
127                 {
128                         if (dtemplate)
129                                 ereport(ERROR,
130                                                 (errcode(ERRCODE_SYNTAX_ERROR),
131                                                  errmsg("conflicting or redundant options")));
132                         dtemplate = defel;
133                 }
134                 else if (strcmp(defel->defname, "encoding") == 0)
135                 {
136                         if (dencoding)
137                                 ereport(ERROR,
138                                                 (errcode(ERRCODE_SYNTAX_ERROR),
139                                                  errmsg("conflicting or redundant options")));
140                         dencoding = defel;
141                 }
142                 else if (strcmp(defel->defname, "connectionlimit") == 0)
143                 {
144                         if (dconnlimit)
145                                 ereport(ERROR,
146                                                 (errcode(ERRCODE_SYNTAX_ERROR),
147                                                  errmsg("conflicting or redundant options")));
148                         dconnlimit = defel;
149                 }
150                 else if (strcmp(defel->defname, "location") == 0)
151                 {
152                         ereport(WARNING,
153                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
154                                          errmsg("LOCATION is not supported anymore"),
155                                          errhint("Consider using tablespaces instead.")));
156                 }
157                 else
158                         elog(ERROR, "option \"%s\" not recognized",
159                                  defel->defname);
160         }
161
162         if (downer && downer->arg)
163                 dbowner = strVal(downer->arg);
164         if (dtemplate && dtemplate->arg)
165                 dbtemplate = strVal(dtemplate->arg);
166         if (dencoding && dencoding->arg)
167         {
168                 const char *encoding_name;
169
170                 if (IsA(dencoding->arg, Integer))
171                 {
172                         encoding = intVal(dencoding->arg);
173                         encoding_name = pg_encoding_to_char(encoding);
174                         if (strcmp(encoding_name, "") == 0 ||
175                                 pg_valid_server_encoding(encoding_name) < 0)
176                                 ereport(ERROR,
177                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
178                                                  errmsg("%d is not a valid encoding code",
179                                                                 encoding)));
180                 }
181                 else if (IsA(dencoding->arg, String))
182                 {
183                         encoding_name = strVal(dencoding->arg);
184                         if (pg_valid_server_encoding(encoding_name) < 0)
185                                 ereport(ERROR,
186                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
187                                                  errmsg("%s is not a valid encoding name",
188                                                                 encoding_name)));
189                         encoding = pg_char_to_encoding(encoding_name);
190                 }
191                 else
192                         elog(ERROR, "unrecognized node type: %d",
193                                  nodeTag(dencoding->arg));
194         }
195         if (dconnlimit && dconnlimit->arg)
196                 dbconnlimit = intVal(dconnlimit->arg);
197
198         /* obtain OID of proposed owner */
199         if (dbowner)
200                 datdba = get_roleid_checked(dbowner);
201         else
202                 datdba = GetUserId();
203
204         /*
205          * To create a database, must have createdb privilege and must be able
206          * to become the target role (this does not imply that the target role
207          * itself must have createdb privilege).  The latter provision guards
208          * against "giveaway" attacks.  Note that a superuser will always have
209          * both of these privileges a fortiori.
210          */
211         if (!have_createdb_privilege())
212                 ereport(ERROR,
213                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
214                                  errmsg("permission denied to create database")));
215
216         check_is_member_of_role(GetUserId(), datdba);
217
218         /*
219          * Check for db name conflict.  There is a race condition here, since
220          * another backend could create the same DB name before we commit.
221          * However, holding an exclusive lock on pg_database for the whole
222          * time we are copying the source database doesn't seem like a good
223          * idea, so accept possibility of race to create.  We will check again
224          * after we grab the exclusive lock.
225          */
226         if (get_db_info(dbname, NULL, NULL, NULL,
227                                         NULL, NULL, NULL, NULL, NULL, NULL))
228                 ereport(ERROR,
229                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
230                                  errmsg("database \"%s\" already exists", dbname)));
231
232         /*
233          * Lookup database (template) to be cloned.
234          */
235         if (!dbtemplate)
236                 dbtemplate = "template1";               /* Default template database name */
237
238         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
239                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
240                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
241                 ereport(ERROR,
242                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
243                  errmsg("template database \"%s\" does not exist", dbtemplate)));
244
245         /*
246          * Permission check: to copy a DB that's not marked datistemplate, you
247          * must be superuser or the owner thereof.
248          */
249         if (!src_istemplate)
250         {
251                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
252                         ereport(ERROR,
253                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
254                                          errmsg("permission denied to copy database \"%s\"",
255                                                         dbtemplate)));
256         }
257
258         /*
259          * The source DB can't have any active backends, except this one
260          * (exception is to allow CREATE DB while connected to template1).
261          * Otherwise we might copy inconsistent data.  This check is not
262          * bulletproof, since someone might connect while we are copying...
263          */
264         if (DatabaseHasActiveBackends(src_dboid, true))
265                 ereport(ERROR,
266                                 (errcode(ERRCODE_OBJECT_IN_USE),
267                 errmsg("source database \"%s\" is being accessed by other users",
268                            dbtemplate)));
269
270         /* If encoding is defaulted, use source's encoding */
271         if (encoding < 0)
272                 encoding = src_encoding;
273
274         /* Some encodings are client only */
275         if (!PG_VALID_BE_ENCODING(encoding))
276                 ereport(ERROR,
277                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
278                                  errmsg("invalid server encoding %d", encoding)));
279
280         /* Resolve default tablespace for new database */
281         if (dtablespacename && dtablespacename->arg)
282         {
283                 char       *tablespacename;
284                 AclResult       aclresult;
285
286                 tablespacename = strVal(dtablespacename->arg);
287                 dst_deftablespace = get_tablespace_oid(tablespacename);
288                 if (!OidIsValid(dst_deftablespace))
289                         ereport(ERROR,
290                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
291                                          errmsg("tablespace \"%s\" does not exist",
292                                                         tablespacename)));
293                 /* check permissions */
294                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
295                                                                                    ACL_CREATE);
296                 if (aclresult != ACLCHECK_OK)
297                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
298                                                    tablespacename);
299
300                 /*
301                  * If we are trying to change the default tablespace of the template,
302                  * we require that the template not have any files in the new default
303                  * tablespace.  This is necessary because otherwise the copied
304                  * database would contain pg_class rows that refer to its default
305                  * tablespace both explicitly (by OID) and implicitly (as zero), which
306                  * would cause problems.  For example another CREATE DATABASE using
307                  * the copied database as template, and trying to change its default
308                  * tablespace again, would yield outright incorrect results (it would
309                  * improperly move tables to the new default tablespace that should
310                  * stay in the same tablespace).
311                  */
312                 if (dst_deftablespace != src_deftablespace)
313                 {
314                         char       *srcpath;
315                         struct stat st;
316
317                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
318
319                         if (stat(srcpath, &st) == 0 &&
320                                 S_ISDIR(st.st_mode) &&
321                                 !directory_is_empty(srcpath))
322                                 ereport(ERROR,
323                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
324                                                  errmsg("cannot assign new default tablespace \"%s\"",
325                                                                 tablespacename),
326                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
327                                                                    dbtemplate)));
328                         pfree(srcpath);
329                 }
330         }
331         else
332         {
333                 /* Use template database's default tablespace */
334                 dst_deftablespace = src_deftablespace;
335                 /* Note there is no additional permission check in this path */
336         }
337
338         /*
339          * Normally we mark the new database with the same datvacuumxid and
340          * datfrozenxid as the source.  However, if the source is not allowing
341          * connections then we assume it is fully frozen, and we can set the
342          * current transaction ID as the xid limits.  This avoids immediately
343          * starting to generate warnings after cloning template0.
344          */
345         if (!src_allowconn)
346                 src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
347
348         /*
349          * Preassign OID for pg_database tuple, so that we can compute db
350          * path.  We have to open pg_database to do this, but we don't want
351          * to take ExclusiveLock yet, so just do it and close again.
352          */
353         pg_database_rel = heap_open(DatabaseRelationId, AccessShareLock);
354         dboid = GetNewOid(pg_database_rel);
355         heap_close(pg_database_rel, AccessShareLock);
356         pg_database_rel = NULL;
357
358         /*
359          * Force dirty buffers out to disk, to ensure source database is
360          * up-to-date for the copy.  (We really only need to flush buffers for
361          * the source database, but bufmgr.c provides no API for that.)
362          */
363         BufferSync();
364
365         /*
366          * Once we start copying subdirectories, we need to be able to clean
367          * 'em up if we fail.  Establish a TRY block to make sure this happens.
368          * (This is not a 100% solution, because of the possibility of failure
369          * during transaction commit after we leave this routine, but it should
370          * handle most scenarios.)
371          */
372         PG_TRY();
373         {
374                 /*
375                  * Iterate through all tablespaces of the template database,
376                  * and copy each one to the new database.
377                  */
378                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
379                 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
380                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
381                 {
382                         Oid                     srctablespace = HeapTupleGetOid(tuple);
383                         Oid                     dsttablespace;
384                         char       *srcpath;
385                         char       *dstpath;
386                         struct stat st;
387
388                         /* No need to copy global tablespace */
389                         if (srctablespace == GLOBALTABLESPACE_OID)
390                                 continue;
391
392                         srcpath = GetDatabasePath(src_dboid, srctablespace);
393
394                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
395                                 directory_is_empty(srcpath))
396                         {
397                                 /* Assume we can ignore it */
398                                 pfree(srcpath);
399                                 continue;
400                         }
401
402                         if (srctablespace == src_deftablespace)
403                                 dsttablespace = dst_deftablespace;
404                         else
405                                 dsttablespace = srctablespace;
406
407                         dstpath = GetDatabasePath(dboid, dsttablespace);
408
409                         /*
410                          * Copy this subdirectory to the new location
411                          *
412                          * We don't need to copy subdirectories
413                          */
414                         copydir(srcpath, dstpath, false);
415
416                         /* Record the filesystem change in XLOG */
417                         {
418                                 xl_dbase_create_rec xlrec;
419                                 XLogRecData rdata[1];
420
421                                 xlrec.db_id = dboid;
422                                 xlrec.tablespace_id = dsttablespace;
423                                 xlrec.src_db_id = src_dboid;
424                                 xlrec.src_tablespace_id = srctablespace;
425
426                                 rdata[0].data = (char *) &xlrec;
427                                 rdata[0].len = sizeof(xl_dbase_create_rec);
428                                 rdata[0].buffer = InvalidBuffer;
429                                 rdata[0].next = NULL;
430
431                                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
432                         }
433                 }
434                 heap_endscan(scan);
435                 heap_close(rel, AccessShareLock);
436
437                 /*
438                  * Now OK to grab exclusive lock on pg_database.
439                  */
440                 pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);
441
442                 /* Check to see if someone else created same DB name meanwhile. */
443                 if (get_db_info(dbname, NULL, NULL, NULL,
444                                                 NULL, NULL, NULL, NULL, NULL, NULL))
445                         ereport(ERROR,
446                                         (errcode(ERRCODE_DUPLICATE_DATABASE),
447                                          errmsg("database \"%s\" already exists", dbname)));
448
449                 /*
450                  * Insert a new tuple into pg_database
451                  */
452                 pg_database_dsc = RelationGetDescr(pg_database_rel);
453
454                 /* Form tuple */
455                 MemSet(new_record, 0, sizeof(new_record));
456                 MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
457
458                 new_record[Anum_pg_database_datname - 1] =
459                         DirectFunctionCall1(namein, CStringGetDatum(dbname));
460                 new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
461                 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
462                 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
463                 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
464                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
465                 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
466                 new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
467                 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
468                 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
469
470                 /*
471                  * We deliberately set datconfig and datacl to defaults (NULL), rather
472                  * than copying them from the template database.  Copying datacl would
473                  * be a bad idea when the owner is not the same as the template's
474                  * owner. It's more debatable whether datconfig should be copied.
475                  */
476                 new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
477                 new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
478
479                 tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
480
481                 HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
482                                                                                          * selection */
483
484                 simple_heap_insert(pg_database_rel, tuple);
485
486                 /* Update indexes */
487                 CatalogUpdateIndexes(pg_database_rel, tuple);
488
489                 /* Register owner dependency */
490                 recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
491
492                 /* Create pg_shdepend entries for objects within database */
493                 copyTemplateDependencies(src_dboid, dboid);
494
495                 /*
496                  * We force a checkpoint before committing.  This effectively means
497                  * that committed XLOG_DBASE_CREATE operations will never need to be
498                  * replayed (at least not in ordinary crash recovery; we still have
499                  * to make the XLOG entry for the benefit of PITR operations).
500                  * This avoids two nasty scenarios:
501                  *
502                  * #1: When PITR is off, we don't XLOG the contents of newly created
503                  * indexes; therefore the drop-and-recreate-whole-directory behavior
504                  * of DBASE_CREATE replay would lose such indexes.
505                  *
506                  * #2: Since we have to recopy the source database during DBASE_CREATE
507                  * replay, we run the risk of copying changes in it that were committed
508                  * after the original CREATE DATABASE command but before the system
509                  * crash that led to the replay.  This is at least unexpected and at
510                  * worst could lead to inconsistencies, eg duplicate table names.
511                  *
512                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
513                  *
514                  * In PITR replay, the first of these isn't an issue, and the second
515                  * is only a risk if the CREATE DATABASE and subsequent template
516                  * database change both occur while a base backup is being taken.
517                  * There doesn't seem to be much we can do about that except document
518                  * it as a limitation.
519                  *
520                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy
521                  * way, we can avoid this.
522                  */
523                 RequestCheckpoint(true, false);
524
525                 /*
526                  * Set flag to update flat database file at commit.
527                  */
528                 database_file_update_needed();
529         }
530         PG_CATCH();
531         {
532                 /* Don't hold pg_database lock while doing recursive remove */
533                 if (pg_database_rel != NULL)
534                         heap_close(pg_database_rel, ExclusiveLock);
535
536                 /* Throw away any successfully copied subdirectories */
537                 remove_dbtablespaces(dboid);
538
539                 PG_RE_THROW();
540         }
541         PG_END_TRY();
542
543         /* Close pg_database, but keep exclusive lock till commit */
544         /* This has to be outside the PG_TRY */
545         heap_close(pg_database_rel, NoLock);
546 }
547
548
549 /*
550  * DROP DATABASE
551  */
552 void
553 dropdb(const char *dbname)
554 {
555         Oid                     db_id;
556         bool            db_istemplate;
557         Relation        pgdbrel;
558         SysScanDesc pgdbscan;
559         ScanKeyData key;
560         HeapTuple       tup;
561
562         PreventTransactionChain((void *) dbname, "DROP DATABASE");
563
564         AssertArg(dbname);
565
566         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
567                 ereport(ERROR,
568                                 (errcode(ERRCODE_OBJECT_IN_USE),
569                                  errmsg("cannot drop the currently open database")));
570
571         /*
572          * Obtain exclusive lock on pg_database.  We need this to ensure that
573          * no new backend starts up in the target database while we are
574          * deleting it.  (Actually, a new backend might still manage to start
575          * up, because it isn't able to lock pg_database while starting.  But
576          * it will detect its error in ReverifyMyDatabase and shut down before
577          * any serious damage is done.  See postinit.c.)
578          *
579          * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient
580          * since ReverifyMyDatabase takes RowShareLock.  This allows ordinary
581          * readers of pg_database to proceed in parallel.
582          */
583         pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
584
585         if (!get_db_info(dbname, &db_id, NULL, NULL,
586                                          &db_istemplate, NULL, NULL, NULL, NULL, NULL))
587                 ereport(ERROR,
588                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
589                                  errmsg("database \"%s\" does not exist", dbname)));
590
591         if (!pg_database_ownercheck(db_id, GetUserId()))
592                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
593                                            dbname);
594
595         /*
596          * Disallow dropping a DB that is marked istemplate.  This is just to
597          * prevent people from accidentally dropping template0 or template1;
598          * they can do so if they're really determined ...
599          */
600         if (db_istemplate)
601                 ereport(ERROR,
602                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
603                                  errmsg("cannot drop a template database")));
604
605         /*
606          * Check for active backends in the target database.
607          */
608         if (DatabaseHasActiveBackends(db_id, false))
609                 ereport(ERROR,
610                                 (errcode(ERRCODE_OBJECT_IN_USE),
611                            errmsg("database \"%s\" is being accessed by other users",
612                                           dbname)));
613
614         /*
615          * Find the database's tuple by OID (should be unique).
616          */
617         ScanKeyInit(&key,
618                                 ObjectIdAttributeNumber,
619                                 BTEqualStrategyNumber, F_OIDEQ,
620                                 ObjectIdGetDatum(db_id));
621
622         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndexId, true,
623                                                                   SnapshotNow, 1, &key);
624
625         tup = systable_getnext(pgdbscan);
626         if (!HeapTupleIsValid(tup))
627         {
628                 /*
629                  * This error should never come up since the existence of the
630                  * database is checked earlier
631                  */
632                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
633                          dbname);
634         }
635
636         /* Remove the database's tuple from pg_database */
637         simple_heap_delete(pgdbrel, &tup->t_self);
638
639         systable_endscan(pgdbscan);
640
641         /*
642          * Delete any comments associated with the database
643          *
644          * NOTE: this is probably dead code since any such comments should have
645          * been in that database, not mine.
646          */
647         DeleteComments(db_id, DatabaseRelationId, 0);
648
649         /*
650          * Remove shared dependency references for the database.
651          */
652         dropDatabaseDependencies(db_id);
653
654         /*
655          * Drop pages for this database that are in the shared buffer cache.
656          * This is important to ensure that no remaining backend tries to
657          * write out a dirty buffer to the dead database later...
658          */
659         DropBuffers(db_id);
660
661         /*
662          * Also, clean out any entries in the shared free space map.
663          */
664         FreeSpaceMapForgetDatabase(db_id);
665
666         /*
667          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
668          * open files, which would cause rmdir() to fail.
669          */
670 #ifdef WIN32
671         RequestCheckpoint(true, false);
672 #endif
673
674         /*
675          * Remove all tablespace subdirs belonging to the database.
676          */
677         remove_dbtablespaces(db_id);
678
679         /* Close pg_database, but keep exclusive lock till commit */
680         heap_close(pgdbrel, NoLock);
681
682         /*
683          * Set flag to update flat database file at commit.
684          */
685         database_file_update_needed();
686 }
687
688
689 /*
690  * Rename database
691  */
692 void
693 RenameDatabase(const char *oldname, const char *newname)
694 {
695         HeapTuple       tup,
696                                 newtup;
697         Relation        rel;
698         SysScanDesc scan,
699                                 scan2;
700         ScanKeyData key,
701                                 key2;
702
703         /*
704          * Obtain ExclusiveLock so that no new session gets started
705          * while the rename is in progress.
706          */
707         rel = heap_open(DatabaseRelationId, ExclusiveLock);
708
709         ScanKeyInit(&key,
710                                 Anum_pg_database_datname,
711                                 BTEqualStrategyNumber, F_NAMEEQ,
712                                 NameGetDatum(oldname));
713         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
714                                                           SnapshotNow, 1, &key);
715
716         tup = systable_getnext(scan);
717         if (!HeapTupleIsValid(tup))
718                 ereport(ERROR,
719                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
720                                  errmsg("database \"%s\" does not exist", oldname)));
721
722         /*
723          * XXX Client applications probably store the current database
724          * somewhere, so renaming it could cause confusion.  On the other
725          * hand, there may not be an actual problem besides a little
726          * confusion, so think about this and decide.
727          */
728         if (HeapTupleGetOid(tup) == MyDatabaseId)
729                 ereport(ERROR,
730                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
731                                  errmsg("current database may not be renamed")));
732
733         /*
734          * Make sure the database does not have active sessions.  Might not be
735          * necessary, but it's consistent with other database operations.
736          */
737         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
738                 ereport(ERROR,
739                                 (errcode(ERRCODE_OBJECT_IN_USE),
740                            errmsg("database \"%s\" is being accessed by other users",
741                                           oldname)));
742
743         /* make sure the new name doesn't exist */
744         ScanKeyInit(&key2,
745                                 Anum_pg_database_datname,
746                                 BTEqualStrategyNumber, F_NAMEEQ,
747                                 NameGetDatum(newname));
748         scan2 = systable_beginscan(rel, DatabaseNameIndexId, true,
749                                                            SnapshotNow, 1, &key2);
750         if (HeapTupleIsValid(systable_getnext(scan2)))
751                 ereport(ERROR,
752                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
753                                  errmsg("database \"%s\" already exists", newname)));
754         systable_endscan(scan2);
755
756         /* must be owner */
757         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
758                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
759                                            oldname);
760
761         /* must have createdb rights */
762         if (!have_createdb_privilege())
763                 ereport(ERROR,
764                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
765                                  errmsg("permission denied to rename database")));
766
767         /* rename */
768         newtup = heap_copytuple(tup);
769         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
770         simple_heap_update(rel, &newtup->t_self, newtup);
771         CatalogUpdateIndexes(rel, newtup);
772
773         systable_endscan(scan);
774
775         /* Close pg_database, but keep exclusive lock till commit */
776         heap_close(rel, NoLock);
777
778         /*
779          * Set flag to update flat database file at commit.
780          */
781         database_file_update_needed();
782 }
783
784
785 /*
786  * ALTER DATABASE name ...
787  */
788 void
789 AlterDatabase(AlterDatabaseStmt *stmt)
790 {
791         Relation        rel;
792         HeapTuple       tuple,
793                                 newtuple;
794         ScanKeyData scankey;
795         SysScanDesc scan;
796         ListCell   *option;
797         int                     connlimit = -1;
798         DefElem    *dconnlimit = NULL;
799         Datum           new_record[Natts_pg_database];
800         char            new_record_nulls[Natts_pg_database];
801         char            new_record_repl[Natts_pg_database];
802
803         /* Extract options from the statement node tree */
804         foreach(option, stmt->options)
805         {
806                 DefElem    *defel = (DefElem *) lfirst(option);
807
808                 if (strcmp(defel->defname, "connectionlimit") == 0)
809                 {
810                         if (dconnlimit)
811                                 ereport(ERROR,
812                                                 (errcode(ERRCODE_SYNTAX_ERROR),
813                                                  errmsg("conflicting or redundant options")));
814                         dconnlimit = defel;
815                 }
816                 else
817                         elog(ERROR, "option \"%s\" not recognized",
818                                  defel->defname);
819         }
820
821         if (dconnlimit)
822                 connlimit = intVal(dconnlimit->arg);
823
824         /*
825          * We don't need ExclusiveLock since we aren't updating the
826          * flat file.
827          */
828         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
829         ScanKeyInit(&scankey,
830                                 Anum_pg_database_datname,
831                                 BTEqualStrategyNumber, F_NAMEEQ,
832                                 NameGetDatum(stmt->dbname));
833         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
834                                                           SnapshotNow, 1, &scankey);
835         tuple = systable_getnext(scan);
836         if (!HeapTupleIsValid(tuple))
837                 ereport(ERROR,
838                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
839                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
840
841         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
842                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
843                                            stmt->dbname);
844
845         /*
846          * Build an updated tuple, perusing the information just obtained
847          */
848         MemSet(new_record, 0, sizeof(new_record));
849         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
850         MemSet(new_record_repl, ' ', sizeof(new_record_repl));
851
852         if (dconnlimit)
853         {
854                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
855                 new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
856         }
857
858         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
859                                                                 new_record_nulls, new_record_repl);
860         simple_heap_update(rel, &tuple->t_self, newtuple);
861
862         /* Update indexes */
863         CatalogUpdateIndexes(rel, newtuple);
864
865         systable_endscan(scan);
866
867         /* Close pg_database, but keep lock till commit */
868         heap_close(rel, NoLock);
869
870         /*
871          * We don't bother updating the flat file since the existing options
872          * for ALTER DATABASE don't affect it.
873          */
874 }
875
876
877 /*
878  * ALTER DATABASE name SET ...
879  */
880 void
881 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
882 {
883         char       *valuestr;
884         HeapTuple       tuple,
885                                 newtuple;
886         Relation        rel;
887         ScanKeyData scankey;
888         SysScanDesc scan;
889         Datum           repl_val[Natts_pg_database];
890         char            repl_null[Natts_pg_database];
891         char            repl_repl[Natts_pg_database];
892
893         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
894
895         /*
896          * We don't need ExclusiveLock since we aren't updating the
897          * flat file.
898          */
899         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
900         ScanKeyInit(&scankey,
901                                 Anum_pg_database_datname,
902                                 BTEqualStrategyNumber, F_NAMEEQ,
903                                 NameGetDatum(stmt->dbname));
904         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
905                                                           SnapshotNow, 1, &scankey);
906         tuple = systable_getnext(scan);
907         if (!HeapTupleIsValid(tuple))
908                 ereport(ERROR,
909                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
910                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
911
912         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
913                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
914                                            stmt->dbname);
915
916         MemSet(repl_repl, ' ', sizeof(repl_repl));
917         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
918
919         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
920         {
921                 /* RESET ALL */
922                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
923                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
924         }
925         else
926         {
927                 Datum           datum;
928                 bool            isnull;
929                 ArrayType  *a;
930
931                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
932
933                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
934                                                          RelationGetDescr(rel), &isnull);
935
936                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
937
938                 if (valuestr)
939                         a = GUCArrayAdd(a, stmt->variable, valuestr);
940                 else
941                         a = GUCArrayDelete(a, stmt->variable);
942
943                 if (a)
944                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
945                 else
946                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
947         }
948
949         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
950         simple_heap_update(rel, &tuple->t_self, newtuple);
951
952         /* Update indexes */
953         CatalogUpdateIndexes(rel, newtuple);
954
955         systable_endscan(scan);
956
957         /* Close pg_database, but keep lock till commit */
958         heap_close(rel, NoLock);
959
960         /*
961          * We don't bother updating the flat file since ALTER DATABASE SET
962          * doesn't affect it.
963          */
964 }
965
966
967 /*
968  * ALTER DATABASE name OWNER TO newowner
969  */
970 void
971 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
972 {
973         HeapTuple       tuple;
974         Relation        rel;
975         ScanKeyData scankey;
976         SysScanDesc scan;
977         Form_pg_database datForm;
978
979         /*
980          * We don't need ExclusiveLock since we aren't updating the
981          * flat file.
982          */
983         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
984         ScanKeyInit(&scankey,
985                                 Anum_pg_database_datname,
986                                 BTEqualStrategyNumber, F_NAMEEQ,
987                                 NameGetDatum(dbname));
988         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
989                                                           SnapshotNow, 1, &scankey);
990         tuple = systable_getnext(scan);
991         if (!HeapTupleIsValid(tuple))
992                 ereport(ERROR,
993                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
994                                  errmsg("database \"%s\" does not exist", dbname)));
995
996         datForm = (Form_pg_database) GETSTRUCT(tuple);
997
998         /*
999          * If the new owner is the same as the existing owner, consider the
1000          * command to have succeeded.  This is to be consistent with other
1001          * objects.
1002          */
1003         if (datForm->datdba != newOwnerId)
1004         {
1005                 Datum           repl_val[Natts_pg_database];
1006                 char            repl_null[Natts_pg_database];
1007                 char            repl_repl[Natts_pg_database];
1008                 Acl                *newAcl;
1009                 Datum           aclDatum;
1010                 bool            isNull;
1011                 HeapTuple       newtuple;
1012
1013                 /* Otherwise, must be owner of the existing object */
1014                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple),GetUserId()))
1015                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1016                                                    dbname);
1017
1018                 /* Must be able to become new owner */
1019                 check_is_member_of_role(GetUserId(), newOwnerId);
1020
1021                 /*
1022                  * must have createdb rights 
1023                  *
1024                  * NOTE: This is different from other alter-owner checks in 
1025                  * that the current user is checked for createdb privileges 
1026                  * instead of the destination owner.  This is consistent
1027                  * with the CREATE case for databases.  Because superusers
1028                  * will always have this right, we need no special case for them.
1029                  */
1030                 if (!have_createdb_privilege())
1031                         ereport(ERROR,
1032                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1033                                          errmsg("permission denied to change owner of database")));
1034
1035                 memset(repl_null, ' ', sizeof(repl_null));
1036                 memset(repl_repl, ' ', sizeof(repl_repl));
1037
1038                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
1039                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1040
1041                 /*
1042                  * Determine the modified ACL for the new owner.  This is only
1043                  * necessary when the ACL is non-null.
1044                  */
1045                 aclDatum = heap_getattr(tuple,
1046                                                                 Anum_pg_database_datacl,
1047                                                                 RelationGetDescr(rel),
1048                                                                 &isNull);
1049                 if (!isNull)
1050                 {
1051                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1052                                                                  datForm->datdba, newOwnerId);
1053                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
1054                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1055                 }
1056
1057                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1058                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1059                 CatalogUpdateIndexes(rel, newtuple);
1060
1061                 heap_freetuple(newtuple);
1062
1063                 /* Update owner dependency reference */
1064                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1065                                                                 newOwnerId);
1066         }
1067
1068         systable_endscan(scan);
1069
1070         /* Close pg_database, but keep lock till commit */
1071         heap_close(rel, NoLock);
1072
1073         /*
1074          * We don't bother updating the flat file since ALTER DATABASE OWNER
1075          * doesn't affect it.
1076          */
1077 }
1078
1079
1080 /*
1081  * Helper functions
1082  */
1083
1084 static bool
1085 get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
1086                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1087                         Oid *dbLastSysOidP,
1088                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
1089                         Oid *dbTablespace)
1090 {
1091         Relation        relation;
1092         ScanKeyData scanKey;
1093         SysScanDesc scan;
1094         HeapTuple       tuple;
1095         bool            gottuple;
1096
1097         AssertArg(name);
1098
1099         /* Caller may wish to grab a better lock on pg_database beforehand... */
1100         relation = heap_open(DatabaseRelationId, AccessShareLock);
1101
1102         ScanKeyInit(&scanKey,
1103                                 Anum_pg_database_datname,
1104                                 BTEqualStrategyNumber, F_NAMEEQ,
1105                                 NameGetDatum(name));
1106
1107         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1108                                                           SnapshotNow, 1, &scanKey);
1109
1110         tuple = systable_getnext(scan);
1111
1112         gottuple = HeapTupleIsValid(tuple);
1113         if (gottuple)
1114         {
1115                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1116
1117                 /* oid of the database */
1118                 if (dbIdP)
1119                         *dbIdP = HeapTupleGetOid(tuple);
1120                 /* oid of the owner */
1121                 if (ownerIdP)
1122                         *ownerIdP = dbform->datdba;
1123                 /* character encoding */
1124                 if (encodingP)
1125                         *encodingP = dbform->encoding;
1126                 /* allowed as template? */
1127                 if (dbIsTemplateP)
1128                         *dbIsTemplateP = dbform->datistemplate;
1129                 /* allowing connections? */
1130                 if (dbAllowConnP)
1131                         *dbAllowConnP = dbform->datallowconn;
1132                 /* last system OID used in database */
1133                 if (dbLastSysOidP)
1134                         *dbLastSysOidP = dbform->datlastsysoid;
1135                 /* limit of vacuumed XIDs */
1136                 if (dbVacuumXidP)
1137                         *dbVacuumXidP = dbform->datvacuumxid;
1138                 /* limit of frozen XIDs */
1139                 if (dbFrozenXidP)
1140                         *dbFrozenXidP = dbform->datfrozenxid;
1141                 /* default tablespace for this database */
1142                 if (dbTablespace)
1143                         *dbTablespace = dbform->dattablespace;
1144         }
1145
1146         systable_endscan(scan);
1147         heap_close(relation, AccessShareLock);
1148
1149         return gottuple;
1150 }
1151
1152 /* Check if current user has createdb privileges */
1153 static bool
1154 have_createdb_privilege(void)
1155 {
1156         bool            result = false;
1157         HeapTuple       utup;
1158
1159         /* Superusers can always do everything */
1160         if (superuser())
1161                 return true;
1162
1163         utup = SearchSysCache(AUTHOID,
1164                                                   ObjectIdGetDatum(GetUserId()),
1165                                                   0, 0, 0);
1166         if (HeapTupleIsValid(utup))
1167         {
1168                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1169                 ReleaseSysCache(utup);
1170         }
1171         return result;
1172 }
1173
1174 /*
1175  * Remove tablespace directories
1176  *
1177  * We don't know what tablespaces db_id is using, so iterate through all
1178  * tablespaces removing <tablespace>/db_id
1179  */
1180 static void
1181 remove_dbtablespaces(Oid db_id)
1182 {
1183         Relation        rel;
1184         HeapScanDesc scan;
1185         HeapTuple       tuple;
1186
1187         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1188         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1189         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1190         {
1191                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1192                 char       *dstpath;
1193                 struct stat st;
1194
1195                 /* Don't mess with the global tablespace */
1196                 if (dsttablespace == GLOBALTABLESPACE_OID)
1197                         continue;
1198
1199                 dstpath = GetDatabasePath(db_id, dsttablespace);
1200
1201                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1202                 {
1203                         /* Assume we can ignore it */
1204                         pfree(dstpath);
1205                         continue;
1206                 }
1207
1208                 if (!rmtree(dstpath, true))
1209                         ereport(WARNING,
1210                                         (errmsg("could not remove database directory \"%s\"",
1211                                                         dstpath)));
1212
1213                 /* Record the filesystem change in XLOG */
1214                 {
1215                         xl_dbase_drop_rec xlrec;
1216                         XLogRecData rdata[1];
1217
1218                         xlrec.db_id = db_id;
1219                         xlrec.tablespace_id = dsttablespace;
1220
1221                         rdata[0].data = (char *) &xlrec;
1222                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1223                         rdata[0].buffer = InvalidBuffer;
1224                         rdata[0].next = NULL;
1225
1226                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1227                 }
1228
1229                 pfree(dstpath);
1230         }
1231
1232         heap_endscan(scan);
1233         heap_close(rel, AccessShareLock);
1234 }
1235
1236
1237 /*
1238  * get_database_oid - given a database name, look up the OID
1239  *
1240  * Returns InvalidOid if database name not found.
1241  *
1242  * This is not actually used in this file, but is exported for use elsewhere.
1243  */
1244 Oid
1245 get_database_oid(const char *dbname)
1246 {
1247         Relation        pg_database;
1248         ScanKeyData entry[1];
1249         SysScanDesc scan;
1250         HeapTuple       dbtuple;
1251         Oid                     oid;
1252
1253         /* There's no syscache for pg_database, so must look the hard way */
1254         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1255         ScanKeyInit(&entry[0],
1256                                 Anum_pg_database_datname,
1257                                 BTEqualStrategyNumber, F_NAMEEQ,
1258                                 CStringGetDatum(dbname));
1259         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1260                                                           SnapshotNow, 1, entry);
1261
1262         dbtuple = systable_getnext(scan);
1263
1264         /* We assume that there can be at most one matching tuple */
1265         if (HeapTupleIsValid(dbtuple))
1266                 oid = HeapTupleGetOid(dbtuple);
1267         else
1268                 oid = InvalidOid;
1269
1270         systable_endscan(scan);
1271         heap_close(pg_database, AccessShareLock);
1272
1273         return oid;
1274 }
1275
1276
1277 /*
1278  * get_database_name - given a database OID, look up the name
1279  *
1280  * Returns a palloc'd string, or NULL if no such database.
1281  *
1282  * This is not actually used in this file, but is exported for use elsewhere.
1283  */
1284 char *
1285 get_database_name(Oid dbid)
1286 {
1287         Relation        pg_database;
1288         ScanKeyData entry[1];
1289         SysScanDesc scan;
1290         HeapTuple       dbtuple;
1291         char       *result;
1292
1293         /* There's no syscache for pg_database, so must look the hard way */
1294         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1295         ScanKeyInit(&entry[0],
1296                                 ObjectIdAttributeNumber,
1297                                 BTEqualStrategyNumber, F_OIDEQ,
1298                                 ObjectIdGetDatum(dbid));
1299         scan = systable_beginscan(pg_database, DatabaseOidIndexId, true,
1300                                                           SnapshotNow, 1, entry);
1301
1302         dbtuple = systable_getnext(scan);
1303
1304         /* We assume that there can be at most one matching tuple */
1305         if (HeapTupleIsValid(dbtuple))
1306                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1307         else
1308                 result = NULL;
1309
1310         systable_endscan(scan);
1311         heap_close(pg_database, AccessShareLock);
1312
1313         return result;
1314 }
1315
1316 /*
1317  * DATABASE resource manager's routines
1318  */
1319 void
1320 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1321 {
1322         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1323
1324         if (info == XLOG_DBASE_CREATE)
1325         {
1326                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1327                 char       *src_path;
1328                 char       *dst_path;
1329                 struct stat st;
1330
1331                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1332                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1333
1334                 /*
1335                  * Our theory for replaying a CREATE is to forcibly drop the
1336                  * target subdirectory if present, then re-copy the source data.
1337                  * This may be more work than needed, but it is simple to
1338                  * implement.
1339                  */
1340                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1341                 {
1342                         if (!rmtree(dst_path, true))
1343                                 ereport(WARNING,
1344                                                 (errmsg("could not remove database directory \"%s\"",
1345                                                                 dst_path)));
1346                 }
1347
1348                 /*
1349                  * Force dirty buffers out to disk, to ensure source database is
1350                  * up-to-date for the copy.  (We really only need to flush buffers for
1351                  * the source database, but bufmgr.c provides no API for that.)
1352                  */
1353                 BufferSync();
1354
1355                 /*
1356                  * Copy this subdirectory to the new location
1357                  *
1358                  * We don't need to copy subdirectories
1359                  */
1360                 copydir(src_path, dst_path, false);
1361         }
1362         else if (info == XLOG_DBASE_DROP)
1363         {
1364                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1365                 char       *dst_path;
1366
1367                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1368
1369                 /*
1370                  * Drop pages for this database that are in the shared buffer
1371                  * cache
1372                  */
1373                 DropBuffers(xlrec->db_id);
1374
1375                 if (!rmtree(dst_path, true))
1376                         ereport(WARNING,
1377                                         (errmsg("could not remove database directory \"%s\"",
1378                                                         dst_path)));
1379         }
1380         else
1381                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1382 }
1383
1384 void
1385 dbase_desc(char *buf, uint8 xl_info, char *rec)
1386 {
1387         uint8           info = xl_info & ~XLR_INFO_MASK;
1388
1389         if (info == XLOG_DBASE_CREATE)
1390         {
1391                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1392
1393                 sprintf(buf + strlen(buf), "create db: copy dir %u/%u to %u/%u",
1394                                 xlrec->src_db_id, xlrec->src_tablespace_id,
1395                                 xlrec->db_id, xlrec->tablespace_id);
1396         }
1397         else if (info == XLOG_DBASE_DROP)
1398         {
1399                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1400
1401                 sprintf(buf + strlen(buf), "drop db: dir %u/%u",
1402                                 xlrec->db_id, xlrec->tablespace_id);
1403         }
1404         else
1405                 strcat(buf, "UNKNOWN");
1406 }