]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Minor correction: cause ALTER ROLE role ROLE rolenames to behave
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands take ExclusiveLock on
7  * pg_database to ensure that no two proceed in parallel.  We must use
8  * at least this level of locking to ensure that no two backends try to
9  * write the flat-file copy of pg_database at once.  We avoid using
10  * AccessExclusiveLock since there's no need to lock out ordinary readers
11  * of pg_database.
12  *
13  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.167 2005/07/14 21:46:29 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 #include "access/genam.h"
29 #include "access/heapam.h"
30 #include "catalog/catalog.h"
31 #include "catalog/dependency.h"
32 #include "catalog/indexing.h"
33 #include "catalog/pg_authid.h"
34 #include "catalog/pg_database.h"
35 #include "catalog/pg_tablespace.h"
36 #include "commands/comment.h"
37 #include "commands/dbcommands.h"
38 #include "commands/tablespace.h"
39 #include "mb/pg_wchar.h"
40 #include "miscadmin.h"
41 #include "postmaster/bgwriter.h"
42 #include "storage/fd.h"
43 #include "storage/freespace.h"
44 #include "storage/procarray.h"
45 #include "utils/acl.h"
46 #include "utils/array.h"
47 #include "utils/builtins.h"
48 #include "utils/flatfiles.h"
49 #include "utils/fmgroids.h"
50 #include "utils/guc.h"
51 #include "utils/lsyscache.h"
52 #include "utils/syscache.h"
53
54
55 /* non-export function prototypes */
56 static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
57                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
58                         Oid *dbLastSysOidP,
59                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
60                         Oid *dbTablespace);
61 static bool have_createdb_privilege(void);
62 static void remove_dbtablespaces(Oid db_id);
63
64
65 /*
66  * CREATE DATABASE
67  */
68 void
69 createdb(const CreatedbStmt *stmt)
70 {
71         HeapScanDesc scan;
72         Relation        rel;
73         Oid                     src_dboid;
74         Oid                     src_owner;
75         int                     src_encoding;
76         bool            src_istemplate;
77         bool            src_allowconn;
78         Oid                     src_lastsysoid;
79         TransactionId src_vacuumxid;
80         TransactionId src_frozenxid;
81         Oid                     src_deftablespace;
82         Oid                     dst_deftablespace;
83         Relation        pg_database_rel;
84         HeapTuple       tuple;
85         TupleDesc       pg_database_dsc;
86         Datum           new_record[Natts_pg_database];
87         char            new_record_nulls[Natts_pg_database];
88         Oid                     dboid;
89         Oid                     datdba;
90         ListCell   *option;
91         DefElem    *dtablespacename = NULL;
92         DefElem    *downer = NULL;
93         DefElem    *dtemplate = NULL;
94         DefElem    *dencoding = NULL;
95         char       *dbname = stmt->dbname;
96         char       *dbowner = NULL;
97         const char *dbtemplate = NULL;
98         int                     encoding = -1;
99
100 #ifndef WIN32
101         char            buf[2 * MAXPGPATH + 100];
102 #endif
103
104         /* don't call this in a transaction block */
105         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
106
107         /* Extract options from the statement node tree */
108         foreach(option, stmt->options)
109         {
110                 DefElem    *defel = (DefElem *) lfirst(option);
111
112                 if (strcmp(defel->defname, "tablespace") == 0)
113                 {
114                         if (dtablespacename)
115                                 ereport(ERROR,
116                                                 (errcode(ERRCODE_SYNTAX_ERROR),
117                                                  errmsg("conflicting or redundant options")));
118                         dtablespacename = defel;
119                 }
120                 else if (strcmp(defel->defname, "owner") == 0)
121                 {
122                         if (downer)
123                                 ereport(ERROR,
124                                                 (errcode(ERRCODE_SYNTAX_ERROR),
125                                                  errmsg("conflicting or redundant options")));
126                         downer = defel;
127                 }
128                 else if (strcmp(defel->defname, "template") == 0)
129                 {
130                         if (dtemplate)
131                                 ereport(ERROR,
132                                                 (errcode(ERRCODE_SYNTAX_ERROR),
133                                                  errmsg("conflicting or redundant options")));
134                         dtemplate = defel;
135                 }
136                 else if (strcmp(defel->defname, "encoding") == 0)
137                 {
138                         if (dencoding)
139                                 ereport(ERROR,
140                                                 (errcode(ERRCODE_SYNTAX_ERROR),
141                                                  errmsg("conflicting or redundant options")));
142                         dencoding = defel;
143                 }
144                 else if (strcmp(defel->defname, "location") == 0)
145                 {
146                         ereport(WARNING,
147                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
148                                          errmsg("LOCATION is not supported anymore"),
149                                          errhint("Consider using tablespaces instead.")));
150                 }
151                 else
152                         elog(ERROR, "option \"%s\" not recognized",
153                                  defel->defname);
154         }
155
156         if (downer && downer->arg)
157                 dbowner = strVal(downer->arg);
158         if (dtemplate && dtemplate->arg)
159                 dbtemplate = strVal(dtemplate->arg);
160         if (dencoding && dencoding->arg)
161         {
162                 const char *encoding_name;
163
164                 if (IsA(dencoding->arg, Integer))
165                 {
166                         encoding = intVal(dencoding->arg);
167                         encoding_name = pg_encoding_to_char(encoding);
168                         if (strcmp(encoding_name, "") == 0 ||
169                                 pg_valid_server_encoding(encoding_name) < 0)
170                                 ereport(ERROR,
171                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
172                                                  errmsg("%d is not a valid encoding code",
173                                                                 encoding)));
174                 }
175                 else if (IsA(dencoding->arg, String))
176                 {
177                         encoding_name = strVal(dencoding->arg);
178                         if (pg_valid_server_encoding(encoding_name) < 0)
179                                 ereport(ERROR,
180                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
181                                                  errmsg("%s is not a valid encoding name",
182                                                                 encoding_name)));
183                         encoding = pg_char_to_encoding(encoding_name);
184                 }
185                 else
186                         elog(ERROR, "unrecognized node type: %d",
187                                  nodeTag(dencoding->arg));
188         }
189
190         /* obtain OID of proposed owner */
191         if (dbowner)
192                 datdba = get_roleid_checked(dbowner);
193         else
194                 datdba = GetUserId();
195
196         /*
197          * To create a database, must have createdb privilege and must be able
198          * to become the target role (this does not imply that the target role
199          * itself must have createdb privilege).  The latter provision guards
200          * against "giveaway" attacks.  Note that a superuser will always have
201          * both of these privileges a fortiori.
202          */
203         if (!have_createdb_privilege())
204                 ereport(ERROR,
205                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
206                                  errmsg("permission denied to create database")));
207
208         check_is_member_of_role(GetUserId(), datdba);
209
210         /*
211          * Check for db name conflict.  There is a race condition here, since
212          * another backend could create the same DB name before we commit.
213          * However, holding an exclusive lock on pg_database for the whole
214          * time we are copying the source database doesn't seem like a good
215          * idea, so accept possibility of race to create.  We will check again
216          * after we grab the exclusive lock.
217          */
218         if (get_db_info(dbname, NULL, NULL, NULL,
219                                         NULL, NULL, NULL, NULL, NULL, NULL))
220                 ereport(ERROR,
221                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
222                                  errmsg("database \"%s\" already exists", dbname)));
223
224         /*
225          * Lookup database (template) to be cloned.
226          */
227         if (!dbtemplate)
228                 dbtemplate = "template1";               /* Default template database name */
229
230         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
231                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
232                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
233                 ereport(ERROR,
234                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
235                  errmsg("template database \"%s\" does not exist", dbtemplate)));
236
237         /*
238          * Permission check: to copy a DB that's not marked datistemplate, you
239          * must be superuser or the owner thereof.
240          */
241         if (!src_istemplate)
242         {
243                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
244                         ereport(ERROR,
245                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
246                                          errmsg("permission denied to copy database \"%s\"",
247                                                         dbtemplate)));
248         }
249
250         /*
251          * The source DB can't have any active backends, except this one
252          * (exception is to allow CREATE DB while connected to template1).
253          * Otherwise we might copy inconsistent data.  This check is not
254          * bulletproof, since someone might connect while we are copying...
255          */
256         if (DatabaseHasActiveBackends(src_dboid, true))
257                 ereport(ERROR,
258                                 (errcode(ERRCODE_OBJECT_IN_USE),
259                 errmsg("source database \"%s\" is being accessed by other users",
260                            dbtemplate)));
261
262         /* If encoding is defaulted, use source's encoding */
263         if (encoding < 0)
264                 encoding = src_encoding;
265
266         /* Some encodings are client only */
267         if (!PG_VALID_BE_ENCODING(encoding))
268                 ereport(ERROR,
269                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
270                                  errmsg("invalid server encoding %d", encoding)));
271
272         /* Resolve default tablespace for new database */
273         if (dtablespacename && dtablespacename->arg)
274         {
275                 char       *tablespacename;
276                 AclResult       aclresult;
277
278                 tablespacename = strVal(dtablespacename->arg);
279                 dst_deftablespace = get_tablespace_oid(tablespacename);
280                 if (!OidIsValid(dst_deftablespace))
281                         ereport(ERROR,
282                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
283                                          errmsg("tablespace \"%s\" does not exist",
284                                                         tablespacename)));
285                 /* check permissions */
286                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
287                                                                                    ACL_CREATE);
288                 if (aclresult != ACLCHECK_OK)
289                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
290                                                    tablespacename);
291
292                 /*
293                  * If we are trying to change the default tablespace of the template,
294                  * we require that the template not have any files in the new default
295                  * tablespace.  This is necessary because otherwise the copied
296                  * database would contain pg_class rows that refer to its default
297                  * tablespace both explicitly (by OID) and implicitly (as zero), which
298                  * would cause problems.  For example another CREATE DATABASE using
299                  * the copied database as template, and trying to change its default
300                  * tablespace again, would yield outright incorrect results (it would
301                  * improperly move tables to the new default tablespace that should
302                  * stay in the same tablespace).
303                  */
304                 if (dst_deftablespace != src_deftablespace)
305                 {
306                         char       *srcpath;
307                         struct stat st;
308
309                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
310
311                         if (stat(srcpath, &st) == 0 &&
312                                 S_ISDIR(st.st_mode) &&
313                                 !directory_is_empty(srcpath))
314                                 ereport(ERROR,
315                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
316                                                  errmsg("cannot assign new default tablespace \"%s\"",
317                                                                 tablespacename),
318                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
319                                                                    dbtemplate)));
320                         pfree(srcpath);
321                 }
322         }
323         else
324         {
325                 /* Use template database's default tablespace */
326                 dst_deftablespace = src_deftablespace;
327                 /* Note there is no additional permission check in this path */
328         }
329
330         /*
331          * Normally we mark the new database with the same datvacuumxid and
332          * datfrozenxid as the source.  However, if the source is not allowing
333          * connections then we assume it is fully frozen, and we can set the
334          * current transaction ID as the xid limits.  This avoids immediately
335          * starting to generate warnings after cloning template0.
336          */
337         if (!src_allowconn)
338                 src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
339
340         /*
341          * Preassign OID for pg_database tuple, so that we can compute db
342          * path.
343          */
344         dboid = newoid();
345
346         /*
347          * Force dirty buffers out to disk, to ensure source database is
348          * up-to-date for the copy.  (We really only need to flush buffers for
349          * the source database, but bufmgr.c provides no API for that.)
350          */
351         BufferSync();
352
353         /*
354          * Close virtual file descriptors so the kernel has more available for
355          * the system() calls below.
356          */
357         closeAllVfds();
358
359         /*
360          * Iterate through all tablespaces of the template database, and copy
361          * each one to the new database.
362          */
363         rel = heap_open(TableSpaceRelationId, AccessShareLock);
364         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
365         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
366         {
367                 Oid                     srctablespace = HeapTupleGetOid(tuple);
368                 Oid                     dsttablespace;
369                 char       *srcpath;
370                 char       *dstpath;
371                 struct stat st;
372
373                 /* No need to copy global tablespace */
374                 if (srctablespace == GLOBALTABLESPACE_OID)
375                         continue;
376
377                 srcpath = GetDatabasePath(src_dboid, srctablespace);
378
379                 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
380                         directory_is_empty(srcpath))
381                 {
382                         /* Assume we can ignore it */
383                         pfree(srcpath);
384                         continue;
385                 }
386
387                 if (srctablespace == src_deftablespace)
388                         dsttablespace = dst_deftablespace;
389                 else
390                         dsttablespace = srctablespace;
391
392                 dstpath = GetDatabasePath(dboid, dsttablespace);
393
394                 if (stat(dstpath, &st) == 0 || errno != ENOENT)
395                 {
396                         remove_dbtablespaces(dboid);
397                         ereport(ERROR,
398                                         (errmsg("could not initialize database directory"),
399                                          errdetail("Directory \"%s\" already exists.",
400                                                            dstpath)));
401                 }
402
403 #ifndef WIN32
404
405                 /*
406                  * Copy this subdirectory to the new location
407                  *
408                  * XXX use of cp really makes this code pretty grotty, particularly
409                  * with respect to lack of ability to report errors well.  Someday
410                  * rewrite to do it for ourselves.
411                  */
412
413                 /* We might need to use cp -R one day for portability */
414                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
415                                  srcpath, dstpath);
416                 if (system(buf) != 0)
417                 {
418                         remove_dbtablespaces(dboid);
419                         ereport(ERROR,
420                                         (errmsg("could not initialize database directory"),
421                                          errdetail("Failing system command was: %s", buf),
422                                          errhint("Look in the postmaster's stderr log for more information.")));
423                 }
424 #else                                                   /* WIN32 */
425                 if (copydir(srcpath, dstpath) != 0)
426                 {
427                         /* copydir should already have given details of its troubles */
428                         remove_dbtablespaces(dboid);
429                         ereport(ERROR,
430                                         (errmsg("could not initialize database directory")));
431                 }
432 #endif   /* WIN32 */
433
434                 /* Record the filesystem change in XLOG */
435                 {
436                         xl_dbase_create_rec xlrec;
437                         XLogRecData rdata[1];
438
439                         xlrec.db_id = dboid;
440                         xlrec.tablespace_id = dsttablespace;
441                         xlrec.src_db_id = src_dboid;
442                         xlrec.src_tablespace_id = srctablespace;
443
444                         rdata[0].data = (char *) &xlrec;
445                         rdata[0].len = sizeof(xl_dbase_create_rec);
446                         rdata[0].buffer = InvalidBuffer;
447                         rdata[0].next = NULL;
448
449                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
450                 }
451         }
452         heap_endscan(scan);
453         heap_close(rel, AccessShareLock);
454
455         /*
456          * Now OK to grab exclusive lock on pg_database.
457          */
458         pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);
459
460         /* Check to see if someone else created same DB name meanwhile. */
461         if (get_db_info(dbname, NULL, NULL, NULL,
462                                         NULL, NULL, NULL, NULL, NULL, NULL))
463         {
464                 /* Don't hold lock while doing recursive remove */
465                 heap_close(pg_database_rel, ExclusiveLock);
466                 remove_dbtablespaces(dboid);
467                 ereport(ERROR,
468                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
469                                  errmsg("database \"%s\" already exists", dbname)));
470         }
471
472         /*
473          * Insert a new tuple into pg_database
474          */
475         pg_database_dsc = RelationGetDescr(pg_database_rel);
476
477         /* Form tuple */
478         MemSet(new_record, 0, sizeof(new_record));
479         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
480
481         new_record[Anum_pg_database_datname - 1] =
482                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
483         new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
484         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
485         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
486         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
487         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
488         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
489         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
490         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
491
492         /*
493          * We deliberately set datconfig and datacl to defaults (NULL), rather
494          * than copying them from the template database.  Copying datacl would
495          * be a bad idea when the owner is not the same as the template's
496          * owner. It's more debatable whether datconfig should be copied.
497          */
498         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
499         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
500
501         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
502
503         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
504                                                                                  * selection */
505
506         simple_heap_insert(pg_database_rel, tuple);
507
508         /* Update indexes */
509         CatalogUpdateIndexes(pg_database_rel, tuple);
510
511         /* Register owner dependency */
512         recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
513
514         /* Create pg_shdepend entries for objects within database */
515         copyTemplateDependencies(src_dboid, dboid);
516
517         /* Close pg_database, but keep exclusive lock till commit */
518         heap_close(pg_database_rel, NoLock);
519
520         /*
521          * We force a checkpoint before committing.  This effectively means
522          * that committed XLOG_DBASE_CREATE operations will never need to be
523          * replayed (at least not in ordinary crash recovery; we still have
524          * to make the XLOG entry for the benefit of PITR operations).
525          * This avoids two nasty scenarios:
526          *
527          * #1: When PITR is off, we don't XLOG the contents of newly created
528          * indexes; therefore the drop-and-recreate-whole-directory behavior
529          * of DBASE_CREATE replay would lose such indexes.
530          *
531          * #2: Since we have to recopy the source database during DBASE_CREATE
532          * replay, we run the risk of copying changes in it that were committed
533          * after the original CREATE DATABASE command but before the system
534          * crash that led to the replay.  This is at least unexpected and at
535          * worst could lead to inconsistencies, eg duplicate table names.
536          *
537          * (Both of these were real bugs in releases 8.0 through 8.0.3.)
538          *
539          * In PITR replay, the first of these isn't an issue, and the second
540          * is only a risk if the CREATE DATABASE and subsequent template
541          * database change both occur while a base backup is being taken.
542          * There doesn't seem to be much we can do about that except document
543          * it as a limitation.
544          *
545          * Perhaps if we ever implement CREATE DATABASE in a less cheesy
546          * way, we can avoid this.
547          */
548         RequestCheckpoint(true, false);
549
550         /*
551          * Set flag to update flat database file at commit.
552          */
553         database_file_update_needed();
554 }
555
556
557 /*
558  * DROP DATABASE
559  */
560 void
561 dropdb(const char *dbname)
562 {
563         Oid                     db_id;
564         bool            db_istemplate;
565         Relation        pgdbrel;
566         SysScanDesc pgdbscan;
567         ScanKeyData key;
568         HeapTuple       tup;
569
570         PreventTransactionChain((void *) dbname, "DROP DATABASE");
571
572         AssertArg(dbname);
573
574         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
575                 ereport(ERROR,
576                                 (errcode(ERRCODE_OBJECT_IN_USE),
577                                  errmsg("cannot drop the currently open database")));
578
579         /*
580          * Obtain exclusive lock on pg_database.  We need this to ensure that
581          * no new backend starts up in the target database while we are
582          * deleting it.  (Actually, a new backend might still manage to start
583          * up, because it isn't able to lock pg_database while starting.  But
584          * it will detect its error in ReverifyMyDatabase and shut down before
585          * any serious damage is done.  See postinit.c.)
586          *
587          * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient
588          * since ReverifyMyDatabase takes RowShareLock.  This allows ordinary
589          * readers of pg_database to proceed in parallel.
590          */
591         pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
592
593         if (!get_db_info(dbname, &db_id, NULL, NULL,
594                                          &db_istemplate, NULL, NULL, NULL, NULL, NULL))
595                 ereport(ERROR,
596                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
597                                  errmsg("database \"%s\" does not exist", dbname)));
598
599         if (!pg_database_ownercheck(db_id, GetUserId()))
600                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
601                                            dbname);
602
603         /*
604          * Disallow dropping a DB that is marked istemplate.  This is just to
605          * prevent people from accidentally dropping template0 or template1;
606          * they can do so if they're really determined ...
607          */
608         if (db_istemplate)
609                 ereport(ERROR,
610                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
611                                  errmsg("cannot drop a template database")));
612
613         /*
614          * Check for active backends in the target database.
615          */
616         if (DatabaseHasActiveBackends(db_id, false))
617                 ereport(ERROR,
618                                 (errcode(ERRCODE_OBJECT_IN_USE),
619                            errmsg("database \"%s\" is being accessed by other users",
620                                           dbname)));
621
622         /*
623          * Find the database's tuple by OID (should be unique).
624          */
625         ScanKeyInit(&key,
626                                 ObjectIdAttributeNumber,
627                                 BTEqualStrategyNumber, F_OIDEQ,
628                                 ObjectIdGetDatum(db_id));
629
630         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndexId, true,
631                                                                   SnapshotNow, 1, &key);
632
633         tup = systable_getnext(pgdbscan);
634         if (!HeapTupleIsValid(tup))
635         {
636                 /*
637                  * This error should never come up since the existence of the
638                  * database is checked earlier
639                  */
640                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
641                          dbname);
642         }
643
644         /* Remove the database's tuple from pg_database */
645         simple_heap_delete(pgdbrel, &tup->t_self);
646
647         systable_endscan(pgdbscan);
648
649         /*
650          * Delete any comments associated with the database
651          *
652          * NOTE: this is probably dead code since any such comments should have
653          * been in that database, not mine.
654          */
655         DeleteComments(db_id, DatabaseRelationId, 0);
656
657         /*
658          * Drop pages for this database that are in the shared buffer cache.
659          * This is important to ensure that no remaining backend tries to
660          * write out a dirty buffer to the dead database later...
661          */
662         DropBuffers(db_id);
663
664         /*
665          * Also, clean out any entries in the shared free space map.
666          */
667         FreeSpaceMapForgetDatabase(db_id);
668
669         /*
670          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
671          * open files, which would cause rmdir() to fail.
672          */
673 #ifdef WIN32
674         RequestCheckpoint(true, false);
675 #endif
676
677         /*
678          * Remove all tablespace subdirs belonging to the database.
679          */
680         remove_dbtablespaces(db_id);
681
682         /* Close pg_database, but keep exclusive lock till commit */
683         heap_close(pgdbrel, NoLock);
684
685         /*
686          * Remove shared dependency references for the database.
687          */
688         dropDatabaseDependencies(db_id);
689
690         /*
691          * Set flag to update flat database file at commit.
692          */
693         database_file_update_needed();
694 }
695
696
697 /*
698  * Rename database
699  */
700 void
701 RenameDatabase(const char *oldname, const char *newname)
702 {
703         HeapTuple       tup,
704                                 newtup;
705         Relation        rel;
706         SysScanDesc scan,
707                                 scan2;
708         ScanKeyData key,
709                                 key2;
710
711         /*
712          * Obtain ExclusiveLock so that no new session gets started
713          * while the rename is in progress.
714          */
715         rel = heap_open(DatabaseRelationId, ExclusiveLock);
716
717         ScanKeyInit(&key,
718                                 Anum_pg_database_datname,
719                                 BTEqualStrategyNumber, F_NAMEEQ,
720                                 NameGetDatum(oldname));
721         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
722                                                           SnapshotNow, 1, &key);
723
724         tup = systable_getnext(scan);
725         if (!HeapTupleIsValid(tup))
726                 ereport(ERROR,
727                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
728                                  errmsg("database \"%s\" does not exist", oldname)));
729
730         /*
731          * XXX Client applications probably store the current database
732          * somewhere, so renaming it could cause confusion.  On the other
733          * hand, there may not be an actual problem besides a little
734          * confusion, so think about this and decide.
735          */
736         if (HeapTupleGetOid(tup) == MyDatabaseId)
737                 ereport(ERROR,
738                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
739                                  errmsg("current database may not be renamed")));
740
741         /*
742          * Make sure the database does not have active sessions.  Might not be
743          * necessary, but it's consistent with other database operations.
744          */
745         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
746                 ereport(ERROR,
747                                 (errcode(ERRCODE_OBJECT_IN_USE),
748                            errmsg("database \"%s\" is being accessed by other users",
749                                           oldname)));
750
751         /* make sure the new name doesn't exist */
752         ScanKeyInit(&key2,
753                                 Anum_pg_database_datname,
754                                 BTEqualStrategyNumber, F_NAMEEQ,
755                                 NameGetDatum(newname));
756         scan2 = systable_beginscan(rel, DatabaseNameIndexId, true,
757                                                            SnapshotNow, 1, &key2);
758         if (HeapTupleIsValid(systable_getnext(scan2)))
759                 ereport(ERROR,
760                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
761                                  errmsg("database \"%s\" already exists", newname)));
762         systable_endscan(scan2);
763
764         /* must be owner */
765         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
766                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
767                                            oldname);
768
769         /* must have createdb rights */
770         if (!have_createdb_privilege())
771                 ereport(ERROR,
772                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
773                                  errmsg("permission denied to rename database")));
774
775         /* rename */
776         newtup = heap_copytuple(tup);
777         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
778         simple_heap_update(rel, &newtup->t_self, newtup);
779         CatalogUpdateIndexes(rel, newtup);
780
781         systable_endscan(scan);
782
783         /* Close pg_database, but keep exclusive lock till commit */
784         heap_close(rel, NoLock);
785
786         /*
787          * Set flag to update flat database file at commit.
788          */
789         database_file_update_needed();
790 }
791
792
793 /*
794  * ALTER DATABASE name SET ...
795  */
796 void
797 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
798 {
799         char       *valuestr;
800         HeapTuple       tuple,
801                                 newtuple;
802         Relation        rel;
803         ScanKeyData scankey;
804         SysScanDesc scan;
805         Datum           repl_val[Natts_pg_database];
806         char            repl_null[Natts_pg_database];
807         char            repl_repl[Natts_pg_database];
808
809         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
810
811         /*
812          * We don't need ExclusiveLock since we aren't updating the
813          * flat file.
814          */
815         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
816         ScanKeyInit(&scankey,
817                                 Anum_pg_database_datname,
818                                 BTEqualStrategyNumber, F_NAMEEQ,
819                                 NameGetDatum(stmt->dbname));
820         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
821                                                           SnapshotNow, 1, &scankey);
822         tuple = systable_getnext(scan);
823         if (!HeapTupleIsValid(tuple))
824                 ereport(ERROR,
825                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
826                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
827
828         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
829                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
830                                            stmt->dbname);
831
832         MemSet(repl_repl, ' ', sizeof(repl_repl));
833         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
834
835         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
836         {
837                 /* RESET ALL */
838                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
839                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
840         }
841         else
842         {
843                 Datum           datum;
844                 bool            isnull;
845                 ArrayType  *a;
846
847                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
848
849                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
850                                                          RelationGetDescr(rel), &isnull);
851
852                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
853
854                 if (valuestr)
855                         a = GUCArrayAdd(a, stmt->variable, valuestr);
856                 else
857                         a = GUCArrayDelete(a, stmt->variable);
858
859                 if (a)
860                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
861                 else
862                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
863         }
864
865         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
866         simple_heap_update(rel, &tuple->t_self, newtuple);
867
868         /* Update indexes */
869         CatalogUpdateIndexes(rel, newtuple);
870
871         systable_endscan(scan);
872
873         /* Close pg_database, but keep lock till commit */
874         heap_close(rel, NoLock);
875
876         /*
877          * We don't bother updating the flat file since ALTER DATABASE SET
878          * doesn't affect it.
879          */
880 }
881
882
883 /*
884  * ALTER DATABASE name OWNER TO newowner
885  */
886 void
887 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
888 {
889         HeapTuple       tuple;
890         Relation        rel;
891         ScanKeyData scankey;
892         SysScanDesc scan;
893         Form_pg_database datForm;
894
895         /*
896          * We don't need ExclusiveLock since we aren't updating the
897          * flat file.
898          */
899         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
900         ScanKeyInit(&scankey,
901                                 Anum_pg_database_datname,
902                                 BTEqualStrategyNumber, F_NAMEEQ,
903                                 NameGetDatum(dbname));
904         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
905                                                           SnapshotNow, 1, &scankey);
906         tuple = systable_getnext(scan);
907         if (!HeapTupleIsValid(tuple))
908                 ereport(ERROR,
909                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
910                                  errmsg("database \"%s\" does not exist", dbname)));
911
912         datForm = (Form_pg_database) GETSTRUCT(tuple);
913
914         /*
915          * If the new owner is the same as the existing owner, consider the
916          * command to have succeeded.  This is to be consistent with other
917          * objects.
918          */
919         if (datForm->datdba != newOwnerId)
920         {
921                 Datum           repl_val[Natts_pg_database];
922                 char            repl_null[Natts_pg_database];
923                 char            repl_repl[Natts_pg_database];
924                 Acl                *newAcl;
925                 Datum           aclDatum;
926                 bool            isNull;
927                 HeapTuple       newtuple;
928
929                 /* Otherwise, must be owner of the existing object */
930                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple),GetUserId()))
931                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
932                                                    dbname);
933
934                 /* Must be able to become new owner */
935                 check_is_member_of_role(GetUserId(), newOwnerId);
936
937                 /*
938                  * must have createdb rights 
939                  *
940                  * NOTE: This is different from other alter-owner checks in 
941                  * that the current user is checked for createdb privileges 
942                  * instead of the destination owner.  This is consistent
943                  * with the CREATE case for databases.
944                  */
945                 if (!have_createdb_privilege())
946                         ereport(ERROR,
947                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
948                                          errmsg("permission denied to change owner of database")));
949
950                 memset(repl_null, ' ', sizeof(repl_null));
951                 memset(repl_repl, ' ', sizeof(repl_repl));
952
953                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
954                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
955
956                 /*
957                  * Determine the modified ACL for the new owner.  This is only
958                  * necessary when the ACL is non-null.
959                  */
960                 aclDatum = heap_getattr(tuple,
961                                                                 Anum_pg_database_datacl,
962                                                                 RelationGetDescr(rel),
963                                                                 &isNull);
964                 if (!isNull)
965                 {
966                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
967                                                                  datForm->datdba, newOwnerId);
968                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
969                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
970                 }
971
972                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
973                 simple_heap_update(rel, &newtuple->t_self, newtuple);
974                 CatalogUpdateIndexes(rel, newtuple);
975
976                 heap_freetuple(newtuple);
977
978                 /* Update owner dependency reference */
979                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
980                                                                 newOwnerId);
981         }
982
983         systable_endscan(scan);
984
985         /* Close pg_database, but keep lock till commit */
986         heap_close(rel, NoLock);
987
988         /*
989          * We don't bother updating the flat file since ALTER DATABASE OWNER
990          * doesn't affect it.
991          */
992 }
993
994
995 /*
996  * Helper functions
997  */
998
999 static bool
1000 get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
1001                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1002                         Oid *dbLastSysOidP,
1003                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
1004                         Oid *dbTablespace)
1005 {
1006         Relation        relation;
1007         ScanKeyData scanKey;
1008         SysScanDesc scan;
1009         HeapTuple       tuple;
1010         bool            gottuple;
1011
1012         AssertArg(name);
1013
1014         /* Caller may wish to grab a better lock on pg_database beforehand... */
1015         relation = heap_open(DatabaseRelationId, AccessShareLock);
1016
1017         ScanKeyInit(&scanKey,
1018                                 Anum_pg_database_datname,
1019                                 BTEqualStrategyNumber, F_NAMEEQ,
1020                                 NameGetDatum(name));
1021
1022         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1023                                                           SnapshotNow, 1, &scanKey);
1024
1025         tuple = systable_getnext(scan);
1026
1027         gottuple = HeapTupleIsValid(tuple);
1028         if (gottuple)
1029         {
1030                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1031
1032                 /* oid of the database */
1033                 if (dbIdP)
1034                         *dbIdP = HeapTupleGetOid(tuple);
1035                 /* oid of the owner */
1036                 if (ownerIdP)
1037                         *ownerIdP = dbform->datdba;
1038                 /* character encoding */
1039                 if (encodingP)
1040                         *encodingP = dbform->encoding;
1041                 /* allowed as template? */
1042                 if (dbIsTemplateP)
1043                         *dbIsTemplateP = dbform->datistemplate;
1044                 /* allowing connections? */
1045                 if (dbAllowConnP)
1046                         *dbAllowConnP = dbform->datallowconn;
1047                 /* last system OID used in database */
1048                 if (dbLastSysOidP)
1049                         *dbLastSysOidP = dbform->datlastsysoid;
1050                 /* limit of vacuumed XIDs */
1051                 if (dbVacuumXidP)
1052                         *dbVacuumXidP = dbform->datvacuumxid;
1053                 /* limit of frozen XIDs */
1054                 if (dbFrozenXidP)
1055                         *dbFrozenXidP = dbform->datfrozenxid;
1056                 /* default tablespace for this database */
1057                 if (dbTablespace)
1058                         *dbTablespace = dbform->dattablespace;
1059         }
1060
1061         systable_endscan(scan);
1062         heap_close(relation, AccessShareLock);
1063
1064         return gottuple;
1065 }
1066
1067 /* Check if current user has createdb privileges */
1068 static bool
1069 have_createdb_privilege(void)
1070 {
1071         bool            result = false;
1072         HeapTuple       utup;
1073
1074         /* Superusers can always do everything */
1075         if (superuser())
1076                 return true;
1077
1078         utup = SearchSysCache(AUTHOID,
1079                                                   ObjectIdGetDatum(GetUserId()),
1080                                                   0, 0, 0);
1081         if (HeapTupleIsValid(utup))
1082         {
1083                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1084                 ReleaseSysCache(utup);
1085         }
1086         return result;
1087 }
1088
1089 /*
1090  * Remove tablespace directories
1091  *
1092  * We don't know what tablespaces db_id is using, so iterate through all
1093  * tablespaces removing <tablespace>/db_id
1094  */
1095 static void
1096 remove_dbtablespaces(Oid db_id)
1097 {
1098         Relation        rel;
1099         HeapScanDesc scan;
1100         HeapTuple       tuple;
1101
1102         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1103         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1104         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1105         {
1106                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1107                 char       *dstpath;
1108                 struct stat st;
1109
1110                 /* Don't mess with the global tablespace */
1111                 if (dsttablespace == GLOBALTABLESPACE_OID)
1112                         continue;
1113
1114                 dstpath = GetDatabasePath(db_id, dsttablespace);
1115
1116                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1117                 {
1118                         /* Assume we can ignore it */
1119                         pfree(dstpath);
1120                         continue;
1121                 }
1122
1123                 if (!rmtree(dstpath, true))
1124                         ereport(WARNING,
1125                                         (errmsg("could not remove database directory \"%s\"",
1126                                                         dstpath)));
1127
1128                 /* Record the filesystem change in XLOG */
1129                 {
1130                         xl_dbase_drop_rec xlrec;
1131                         XLogRecData rdata[1];
1132
1133                         xlrec.db_id = db_id;
1134                         xlrec.tablespace_id = dsttablespace;
1135
1136                         rdata[0].data = (char *) &xlrec;
1137                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1138                         rdata[0].buffer = InvalidBuffer;
1139                         rdata[0].next = NULL;
1140
1141                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1142                 }
1143
1144                 pfree(dstpath);
1145         }
1146
1147         heap_endscan(scan);
1148         heap_close(rel, AccessShareLock);
1149 }
1150
1151
1152 /*
1153  * get_database_oid - given a database name, look up the OID
1154  *
1155  * Returns InvalidOid if database name not found.
1156  *
1157  * This is not actually used in this file, but is exported for use elsewhere.
1158  */
1159 Oid
1160 get_database_oid(const char *dbname)
1161 {
1162         Relation        pg_database;
1163         ScanKeyData entry[1];
1164         SysScanDesc scan;
1165         HeapTuple       dbtuple;
1166         Oid                     oid;
1167
1168         /* There's no syscache for pg_database, so must look the hard way */
1169         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1170         ScanKeyInit(&entry[0],
1171                                 Anum_pg_database_datname,
1172                                 BTEqualStrategyNumber, F_NAMEEQ,
1173                                 CStringGetDatum(dbname));
1174         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1175                                                           SnapshotNow, 1, entry);
1176
1177         dbtuple = systable_getnext(scan);
1178
1179         /* We assume that there can be at most one matching tuple */
1180         if (HeapTupleIsValid(dbtuple))
1181                 oid = HeapTupleGetOid(dbtuple);
1182         else
1183                 oid = InvalidOid;
1184
1185         systable_endscan(scan);
1186         heap_close(pg_database, AccessShareLock);
1187
1188         return oid;
1189 }
1190
1191
1192 /*
1193  * get_database_name - given a database OID, look up the name
1194  *
1195  * Returns a palloc'd string, or NULL if no such database.
1196  *
1197  * This is not actually used in this file, but is exported for use elsewhere.
1198  */
1199 char *
1200 get_database_name(Oid dbid)
1201 {
1202         Relation        pg_database;
1203         ScanKeyData entry[1];
1204         SysScanDesc scan;
1205         HeapTuple       dbtuple;
1206         char       *result;
1207
1208         /* There's no syscache for pg_database, so must look the hard way */
1209         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1210         ScanKeyInit(&entry[0],
1211                                 ObjectIdAttributeNumber,
1212                                 BTEqualStrategyNumber, F_OIDEQ,
1213                                 ObjectIdGetDatum(dbid));
1214         scan = systable_beginscan(pg_database, DatabaseOidIndexId, true,
1215                                                           SnapshotNow, 1, entry);
1216
1217         dbtuple = systable_getnext(scan);
1218
1219         /* We assume that there can be at most one matching tuple */
1220         if (HeapTupleIsValid(dbtuple))
1221                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1222         else
1223                 result = NULL;
1224
1225         systable_endscan(scan);
1226         heap_close(pg_database, AccessShareLock);
1227
1228         return result;
1229 }
1230
1231 /*
1232  * DATABASE resource manager's routines
1233  */
1234 void
1235 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1236 {
1237         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1238
1239         if (info == XLOG_DBASE_CREATE)
1240         {
1241                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1242                 char       *src_path;
1243                 char       *dst_path;
1244                 struct stat st;
1245
1246 #ifndef WIN32
1247                 char            buf[2 * MAXPGPATH + 100];
1248 #endif
1249
1250                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1251                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1252
1253                 /*
1254                  * Our theory for replaying a CREATE is to forcibly drop the
1255                  * target subdirectory if present, then re-copy the source data.
1256                  * This may be more work than needed, but it is simple to
1257                  * implement.
1258                  */
1259                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1260                 {
1261                         if (!rmtree(dst_path, true))
1262                                 ereport(WARNING,
1263                                         (errmsg("could not remove database directory \"%s\"",
1264                                                         dst_path)));
1265                 }
1266
1267                 /*
1268                  * Force dirty buffers out to disk, to ensure source database is
1269                  * up-to-date for the copy.  (We really only need to flush buffers for
1270                  * the source database, but bufmgr.c provides no API for that.)
1271                  */
1272                 BufferSync();
1273
1274 #ifndef WIN32
1275
1276                 /*
1277                  * Copy this subdirectory to the new location
1278                  *
1279                  * XXX use of cp really makes this code pretty grotty, particularly
1280                  * with respect to lack of ability to report errors well.  Someday
1281                  * rewrite to do it for ourselves.
1282                  */
1283
1284                 /* We might need to use cp -R one day for portability */
1285                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1286                                  src_path, dst_path);
1287                 if (system(buf) != 0)
1288                         ereport(ERROR,
1289                                         (errmsg("could not initialize database directory"),
1290                                          errdetail("Failing system command was: %s", buf),
1291                                          errhint("Look in the postmaster's stderr log for more information.")));
1292 #else                                                   /* WIN32 */
1293                 if (copydir(src_path, dst_path) != 0)
1294                 {
1295                         /* copydir should already have given details of its troubles */
1296                         ereport(ERROR,
1297                                         (errmsg("could not initialize database directory")));
1298                 }
1299 #endif   /* WIN32 */
1300         }
1301         else if (info == XLOG_DBASE_DROP)
1302         {
1303                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1304                 char       *dst_path;
1305
1306                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1307
1308                 /*
1309                  * Drop pages for this database that are in the shared buffer
1310                  * cache
1311                  */
1312                 DropBuffers(xlrec->db_id);
1313
1314                 if (!rmtree(dst_path, true))
1315                         ereport(WARNING,
1316                                         (errmsg("could not remove database directory \"%s\"",
1317                                                         dst_path)));
1318         }
1319         else
1320                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1321 }
1322
1323 void
1324 dbase_desc(char *buf, uint8 xl_info, char *rec)
1325 {
1326         uint8           info = xl_info & ~XLR_INFO_MASK;
1327
1328         if (info == XLOG_DBASE_CREATE)
1329         {
1330                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1331
1332                 sprintf(buf + strlen(buf), "create db: copy dir %u/%u to %u/%u",
1333                                 xlrec->src_db_id, xlrec->src_tablespace_id,
1334                                 xlrec->db_id, xlrec->tablespace_id);
1335         }
1336         else if (info == XLOG_DBASE_DROP)
1337         {
1338                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1339
1340                 sprintf(buf + strlen(buf), "drop db: dir %u/%u",
1341                                 xlrec->db_id, xlrec->tablespace_id);
1342         }
1343         else
1344                 strcat(buf, "UNKNOWN");
1345 }