]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Completion of project to use fixed OIDs for all system catalogs and
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands take ExclusiveLock on
7  * pg_database to ensure that no two proceed in parallel.  We must use
8  * at least this level of locking to ensure that no two backends try to
9  * write the flat-file copy of pg_database at once.  We avoid using
10  * AccessExclusiveLock since there's no need to lock out ordinary readers
11  * of pg_database.
12  *
13  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.156 2005/04/14 20:03:23 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 #include "access/genam.h"
29 #include "access/heapam.h"
30 #include "catalog/catalog.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_shadow.h"
33 #include "catalog/pg_tablespace.h"
34 #include "catalog/indexing.h"
35 #include "commands/comment.h"
36 #include "commands/dbcommands.h"
37 #include "commands/tablespace.h"
38 #include "mb/pg_wchar.h"
39 #include "miscadmin.h"
40 #include "postmaster/bgwriter.h"
41 #include "storage/fd.h"
42 #include "storage/freespace.h"
43 #include "storage/sinval.h"
44 #include "utils/acl.h"
45 #include "utils/array.h"
46 #include "utils/builtins.h"
47 #include "utils/flatfiles.h"
48 #include "utils/fmgroids.h"
49 #include "utils/guc.h"
50 #include "utils/lsyscache.h"
51 #include "utils/syscache.h"
52
53
54 /* non-export function prototypes */
55 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
56                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
57                         Oid *dbLastSysOidP,
58                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
59                         Oid *dbTablespace);
60 static bool have_createdb_privilege(void);
61 static void remove_dbtablespaces(Oid db_id);
62
63
64 /*
65  * CREATE DATABASE
66  */
67 void
68 createdb(const CreatedbStmt *stmt)
69 {
70         HeapScanDesc scan;
71         Relation        rel;
72         Oid                     src_dboid;
73         AclId           src_owner;
74         int                     src_encoding;
75         bool            src_istemplate;
76         bool            src_allowconn;
77         Oid                     src_lastsysoid;
78         TransactionId src_vacuumxid;
79         TransactionId src_frozenxid;
80         Oid                     src_deftablespace;
81         Oid                     dst_deftablespace;
82         Relation        pg_database_rel;
83         HeapTuple       tuple;
84         TupleDesc       pg_database_dsc;
85         Datum           new_record[Natts_pg_database];
86         char            new_record_nulls[Natts_pg_database];
87         Oid                     dboid;
88         AclId           datdba;
89         ListCell   *option;
90         DefElem    *dtablespacename = NULL;
91         DefElem    *downer = NULL;
92         DefElem    *dtemplate = NULL;
93         DefElem    *dencoding = NULL;
94         char       *dbname = stmt->dbname;
95         char       *dbowner = NULL;
96         char       *dbtemplate = NULL;
97         int                     encoding = -1;
98
99 #ifndef WIN32
100         char            buf[2 * MAXPGPATH + 100];
101 #endif
102
103         /* don't call this in a transaction block */
104         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
105
106         /* Extract options from the statement node tree */
107         foreach(option, stmt->options)
108         {
109                 DefElem    *defel = (DefElem *) lfirst(option);
110
111                 if (strcmp(defel->defname, "tablespace") == 0)
112                 {
113                         if (dtablespacename)
114                                 ereport(ERROR,
115                                                 (errcode(ERRCODE_SYNTAX_ERROR),
116                                                  errmsg("conflicting or redundant options")));
117                         dtablespacename = defel;
118                 }
119                 else if (strcmp(defel->defname, "owner") == 0)
120                 {
121                         if (downer)
122                                 ereport(ERROR,
123                                                 (errcode(ERRCODE_SYNTAX_ERROR),
124                                                  errmsg("conflicting or redundant options")));
125                         downer = defel;
126                 }
127                 else if (strcmp(defel->defname, "template") == 0)
128                 {
129                         if (dtemplate)
130                                 ereport(ERROR,
131                                                 (errcode(ERRCODE_SYNTAX_ERROR),
132                                                  errmsg("conflicting or redundant options")));
133                         dtemplate = defel;
134                 }
135                 else if (strcmp(defel->defname, "encoding") == 0)
136                 {
137                         if (dencoding)
138                                 ereport(ERROR,
139                                                 (errcode(ERRCODE_SYNTAX_ERROR),
140                                                  errmsg("conflicting or redundant options")));
141                         dencoding = defel;
142                 }
143                 else if (strcmp(defel->defname, "location") == 0)
144                 {
145                         ereport(WARNING,
146                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
147                                          errmsg("LOCATION is not supported anymore"),
148                                          errhint("Consider using tablespaces instead.")));
149                 }
150                 else
151                         elog(ERROR, "option \"%s\" not recognized",
152                                  defel->defname);
153         }
154
155         if (downer && downer->arg)
156                 dbowner = strVal(downer->arg);
157         if (dtemplate && dtemplate->arg)
158                 dbtemplate = strVal(dtemplate->arg);
159         if (dencoding && dencoding->arg)
160         {
161                 const char *encoding_name;
162
163                 if (IsA(dencoding->arg, Integer))
164                 {
165                         encoding = intVal(dencoding->arg);
166                         encoding_name = pg_encoding_to_char(encoding);
167                         if (strcmp(encoding_name, "") == 0 ||
168                                 pg_valid_server_encoding(encoding_name) < 0)
169                                 ereport(ERROR,
170                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
171                                                  errmsg("%d is not a valid encoding code",
172                                                                 encoding)));
173                 }
174                 else if (IsA(dencoding->arg, String))
175                 {
176                         encoding_name = strVal(dencoding->arg);
177                         if (pg_valid_server_encoding(encoding_name) < 0)
178                                 ereport(ERROR,
179                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
180                                                  errmsg("%s is not a valid encoding name",
181                                                                 encoding_name)));
182                         encoding = pg_char_to_encoding(encoding_name);
183                 }
184                 else
185                         elog(ERROR, "unrecognized node type: %d",
186                                  nodeTag(dencoding->arg));
187         }
188
189         /* obtain sysid of proposed owner */
190         if (dbowner)
191                 datdba = get_usesysid(dbowner); /* will ereport if no such user */
192         else
193                 datdba = GetUserId();
194
195         if (datdba == GetUserId())
196         {
197                 /* creating database for self: can be superuser or createdb */
198                 if (!superuser() && !have_createdb_privilege())
199                         ereport(ERROR,
200                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
201                                          errmsg("permission denied to create database")));
202         }
203         else
204         {
205                 /* creating database for someone else: must be superuser */
206                 /* note that the someone else need not have any permissions */
207                 if (!superuser())
208                         ereport(ERROR,
209                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
210                                          errmsg("must be superuser to create database for another user")));
211         }
212
213         /*
214          * Check for db name conflict.  There is a race condition here, since
215          * another backend could create the same DB name before we commit.
216          * However, holding an exclusive lock on pg_database for the whole
217          * time we are copying the source database doesn't seem like a good
218          * idea, so accept possibility of race to create.  We will check again
219          * after we grab the exclusive lock.
220          */
221         if (get_db_info(dbname, NULL, NULL, NULL,
222                                         NULL, NULL, NULL, NULL, NULL, NULL))
223                 ereport(ERROR,
224                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
225                                  errmsg("database \"%s\" already exists", dbname)));
226
227         /*
228          * Lookup database (template) to be cloned.
229          */
230         if (!dbtemplate)
231                 dbtemplate = "template1";               /* Default template database name */
232
233         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
234                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
235                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
236                 ereport(ERROR,
237                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
238                  errmsg("template database \"%s\" does not exist", dbtemplate)));
239
240         /*
241          * Permission check: to copy a DB that's not marked datistemplate, you
242          * must be superuser or the owner thereof.
243          */
244         if (!src_istemplate)
245         {
246                 if (!superuser() && GetUserId() != src_owner)
247                         ereport(ERROR,
248                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
249                                          errmsg("permission denied to copy database \"%s\"",
250                                                         dbtemplate)));
251         }
252
253         /*
254          * The source DB can't have any active backends, except this one
255          * (exception is to allow CREATE DB while connected to template1).
256          * Otherwise we might copy inconsistent data.  This check is not
257          * bulletproof, since someone might connect while we are copying...
258          */
259         if (DatabaseHasActiveBackends(src_dboid, true))
260                 ereport(ERROR,
261                                 (errcode(ERRCODE_OBJECT_IN_USE),
262                 errmsg("source database \"%s\" is being accessed by other users",
263                            dbtemplate)));
264
265         /* If encoding is defaulted, use source's encoding */
266         if (encoding < 0)
267                 encoding = src_encoding;
268
269         /* Some encodings are client only */
270         if (!PG_VALID_BE_ENCODING(encoding))
271                 ereport(ERROR,
272                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
273                                  errmsg("invalid server encoding %d", encoding)));
274
275         /* Resolve default tablespace for new database */
276         if (dtablespacename && dtablespacename->arg)
277         {
278                 char       *tablespacename;
279                 AclResult       aclresult;
280
281                 tablespacename = strVal(dtablespacename->arg);
282                 dst_deftablespace = get_tablespace_oid(tablespacename);
283                 if (!OidIsValid(dst_deftablespace))
284                         ereport(ERROR,
285                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
286                                          errmsg("tablespace \"%s\" does not exist",
287                                                         tablespacename)));
288                 /* check permissions */
289                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
290                                                                                    ACL_CREATE);
291                 if (aclresult != ACLCHECK_OK)
292                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
293                                                    tablespacename);
294
295                 /*
296                  * If we are trying to change the default tablespace of the template,
297                  * we require that the template not have any files in the new default
298                  * tablespace.  This is necessary because otherwise the copied
299                  * database would contain pg_class rows that refer to its default
300                  * tablespace both explicitly (by OID) and implicitly (as zero), which
301                  * would cause problems.  For example another CREATE DATABASE using
302                  * the copied database as template, and trying to change its default
303                  * tablespace again, would yield outright incorrect results (it would
304                  * improperly move tables to the new default tablespace that should
305                  * stay in the same tablespace).
306                  */
307                 if (dst_deftablespace != src_deftablespace)
308                 {
309                         char       *srcpath;
310                         struct stat st;
311
312                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
313
314                         if (stat(srcpath, &st) == 0 &&
315                                 S_ISDIR(st.st_mode) &&
316                                 !directory_is_empty(srcpath))
317                                 ereport(ERROR,
318                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
319                                                  errmsg("cannot assign new default tablespace \"%s\"",
320                                                                 tablespacename),
321                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
322                                                                    dbtemplate)));
323                         pfree(srcpath);
324                 }
325         }
326         else
327         {
328                 /* Use template database's default tablespace */
329                 dst_deftablespace = src_deftablespace;
330                 /* Note there is no additional permission check in this path */
331         }
332
333         /*
334          * Normally we mark the new database with the same datvacuumxid and
335          * datfrozenxid as the source.  However, if the source is not allowing
336          * connections then we assume it is fully frozen, and we can set the
337          * current transaction ID as the xid limits.  This avoids immediately
338          * starting to generate warnings after cloning template0.
339          */
340         if (!src_allowconn)
341                 src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
342
343         /*
344          * Preassign OID for pg_database tuple, so that we can compute db
345          * path.
346          */
347         dboid = newoid();
348
349         /*
350          * Force dirty buffers out to disk, to ensure source database is
351          * up-to-date for the copy.  (We really only need to flush buffers for
352          * the source database, but bufmgr.c provides no API for that.)
353          */
354         BufferSync();
355
356         /*
357          * Close virtual file descriptors so the kernel has more available for
358          * the system() calls below.
359          */
360         closeAllVfds();
361
362         /*
363          * Iterate through all tablespaces of the template database, and copy
364          * each one to the new database.
365          */
366         rel = heap_open(TableSpaceRelationId, AccessShareLock);
367         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
368         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
369         {
370                 Oid                     srctablespace = HeapTupleGetOid(tuple);
371                 Oid                     dsttablespace;
372                 char       *srcpath;
373                 char       *dstpath;
374                 struct stat st;
375
376                 /* No need to copy global tablespace */
377                 if (srctablespace == GLOBALTABLESPACE_OID)
378                         continue;
379
380                 srcpath = GetDatabasePath(src_dboid, srctablespace);
381
382                 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
383                         directory_is_empty(srcpath))
384                 {
385                         /* Assume we can ignore it */
386                         pfree(srcpath);
387                         continue;
388                 }
389
390                 if (srctablespace == src_deftablespace)
391                         dsttablespace = dst_deftablespace;
392                 else
393                         dsttablespace = srctablespace;
394
395                 dstpath = GetDatabasePath(dboid, dsttablespace);
396
397                 if (stat(dstpath, &st) == 0 || errno != ENOENT)
398                 {
399                         remove_dbtablespaces(dboid);
400                         ereport(ERROR,
401                                         (errmsg("could not initialize database directory"),
402                                          errdetail("Directory \"%s\" already exists.",
403                                                            dstpath)));
404                 }
405
406 #ifndef WIN32
407
408                 /*
409                  * Copy this subdirectory to the new location
410                  *
411                  * XXX use of cp really makes this code pretty grotty, particularly
412                  * with respect to lack of ability to report errors well.  Someday
413                  * rewrite to do it for ourselves.
414                  */
415
416                 /* We might need to use cp -R one day for portability */
417                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
418                                  srcpath, dstpath);
419                 if (system(buf) != 0)
420                 {
421                         remove_dbtablespaces(dboid);
422                         ereport(ERROR,
423                                         (errmsg("could not initialize database directory"),
424                                          errdetail("Failing system command was: %s", buf),
425                                          errhint("Look in the postmaster's stderr log for more information.")));
426                 }
427 #else                                                   /* WIN32 */
428                 if (copydir(srcpath, dstpath) != 0)
429                 {
430                         /* copydir should already have given details of its troubles */
431                         remove_dbtablespaces(dboid);
432                         ereport(ERROR,
433                                         (errmsg("could not initialize database directory")));
434                 }
435 #endif   /* WIN32 */
436
437                 /* Record the filesystem change in XLOG */
438                 {
439                         xl_dbase_create_rec xlrec;
440                         XLogRecData rdata[1];
441
442                         xlrec.db_id = dboid;
443                         xlrec.tablespace_id = dsttablespace;
444                         xlrec.src_db_id = src_dboid;
445                         xlrec.src_tablespace_id = srctablespace;
446
447                         rdata[0].buffer = InvalidBuffer;
448                         rdata[0].data = (char *) &xlrec;
449                         rdata[0].len = sizeof(xl_dbase_create_rec);
450                         rdata[0].next = NULL;
451
452                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
453                 }
454         }
455         heap_endscan(scan);
456         heap_close(rel, AccessShareLock);
457
458         /*
459          * Now OK to grab exclusive lock on pg_database.
460          */
461         pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);
462
463         /* Check to see if someone else created same DB name meanwhile. */
464         if (get_db_info(dbname, NULL, NULL, NULL,
465                                         NULL, NULL, NULL, NULL, NULL, NULL))
466         {
467                 /* Don't hold lock while doing recursive remove */
468                 heap_close(pg_database_rel, ExclusiveLock);
469                 remove_dbtablespaces(dboid);
470                 ereport(ERROR,
471                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
472                                  errmsg("database \"%s\" already exists", dbname)));
473         }
474
475         /*
476          * Insert a new tuple into pg_database
477          */
478         pg_database_dsc = RelationGetDescr(pg_database_rel);
479
480         /* Form tuple */
481         MemSet(new_record, 0, sizeof(new_record));
482         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
483
484         new_record[Anum_pg_database_datname - 1] =
485                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
486         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
487         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
488         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
489         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
490         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
491         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
492         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
493         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
494
495         /*
496          * We deliberately set datconfig and datacl to defaults (NULL), rather
497          * than copying them from the template database.  Copying datacl would
498          * be a bad idea when the owner is not the same as the template's
499          * owner. It's more debatable whether datconfig should be copied.
500          */
501         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
502         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
503
504         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
505
506         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
507                                                                                  * selection */
508
509         simple_heap_insert(pg_database_rel, tuple);
510
511         /* Update indexes */
512         CatalogUpdateIndexes(pg_database_rel, tuple);
513
514         /* Close pg_database, but keep exclusive lock till commit */
515         heap_close(pg_database_rel, NoLock);
516
517         /*
518          * Set flag to update flat database file at commit.
519          */
520         database_file_update_needed();
521 }
522
523
524 /*
525  * DROP DATABASE
526  */
527 void
528 dropdb(const char *dbname)
529 {
530         int4            db_owner;
531         bool            db_istemplate;
532         Oid                     db_id;
533         Relation        pgdbrel;
534         SysScanDesc pgdbscan;
535         ScanKeyData key;
536         HeapTuple       tup;
537
538         PreventTransactionChain((void *) dbname, "DROP DATABASE");
539
540         AssertArg(dbname);
541
542         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
543                 ereport(ERROR,
544                                 (errcode(ERRCODE_OBJECT_IN_USE),
545                                  errmsg("cannot drop the currently open database")));
546
547         /*
548          * Obtain exclusive lock on pg_database.  We need this to ensure that
549          * no new backend starts up in the target database while we are
550          * deleting it.  (Actually, a new backend might still manage to start
551          * up, because it isn't able to lock pg_database while starting.  But
552          * it will detect its error in ReverifyMyDatabase and shut down before
553          * any serious damage is done.  See postinit.c.)
554          *
555          * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient
556          * since ReverifyMyDatabase takes RowShareLock.  This allows ordinary
557          * readers of pg_database to proceed in parallel.
558          */
559         pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
560
561         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
562                                          &db_istemplate, NULL, NULL, NULL, NULL, NULL))
563                 ereport(ERROR,
564                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
565                                  errmsg("database \"%s\" does not exist", dbname)));
566
567         if (GetUserId() != db_owner && !superuser())
568                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
569                                            dbname);
570
571         /*
572          * Disallow dropping a DB that is marked istemplate.  This is just to
573          * prevent people from accidentally dropping template0 or template1;
574          * they can do so if they're really determined ...
575          */
576         if (db_istemplate)
577                 ereport(ERROR,
578                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
579                                  errmsg("cannot drop a template database")));
580
581         /*
582          * Check for active backends in the target database.
583          */
584         if (DatabaseHasActiveBackends(db_id, false))
585                 ereport(ERROR,
586                                 (errcode(ERRCODE_OBJECT_IN_USE),
587                            errmsg("database \"%s\" is being accessed by other users",
588                                           dbname)));
589
590         /*
591          * Find the database's tuple by OID (should be unique).
592          */
593         ScanKeyInit(&key,
594                                 ObjectIdAttributeNumber,
595                                 BTEqualStrategyNumber, F_OIDEQ,
596                                 ObjectIdGetDatum(db_id));
597
598         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndexId, true,
599                                                                   SnapshotNow, 1, &key);
600
601         tup = systable_getnext(pgdbscan);
602         if (!HeapTupleIsValid(tup))
603         {
604                 /*
605                  * This error should never come up since the existence of the
606                  * database is checked earlier
607                  */
608                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
609                          dbname);
610         }
611
612         /* Remove the database's tuple from pg_database */
613         simple_heap_delete(pgdbrel, &tup->t_self);
614
615         systable_endscan(pgdbscan);
616
617         /*
618          * Delete any comments associated with the database
619          *
620          * NOTE: this is probably dead code since any such comments should have
621          * been in that database, not mine.
622          */
623         DeleteComments(db_id, DatabaseRelationId, 0);
624
625         /*
626          * Drop pages for this database that are in the shared buffer cache.
627          * This is important to ensure that no remaining backend tries to
628          * write out a dirty buffer to the dead database later...
629          */
630         DropBuffers(db_id);
631
632         /*
633          * Also, clean out any entries in the shared free space map.
634          */
635         FreeSpaceMapForgetDatabase(db_id);
636
637         /*
638          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
639          * open files, which would cause rmdir() to fail.
640          */
641 #ifdef WIN32
642         RequestCheckpoint(true);
643 #endif
644
645         /*
646          * Remove all tablespace subdirs belonging to the database.
647          */
648         remove_dbtablespaces(db_id);
649
650         /* Close pg_database, but keep exclusive lock till commit */
651         heap_close(pgdbrel, NoLock);
652
653         /*
654          * Set flag to update flat database file at commit.
655          */
656         database_file_update_needed();
657 }
658
659
660 /*
661  * Rename database
662  */
663 void
664 RenameDatabase(const char *oldname, const char *newname)
665 {
666         HeapTuple       tup,
667                                 newtup;
668         Relation        rel;
669         SysScanDesc scan,
670                                 scan2;
671         ScanKeyData key,
672                                 key2;
673
674         /*
675          * Obtain ExclusiveLock so that no new session gets started
676          * while the rename is in progress.
677          */
678         rel = heap_open(DatabaseRelationId, ExclusiveLock);
679
680         ScanKeyInit(&key,
681                                 Anum_pg_database_datname,
682                                 BTEqualStrategyNumber, F_NAMEEQ,
683                                 NameGetDatum(oldname));
684         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
685                                                           SnapshotNow, 1, &key);
686
687         tup = systable_getnext(scan);
688         if (!HeapTupleIsValid(tup))
689                 ereport(ERROR,
690                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
691                                  errmsg("database \"%s\" does not exist", oldname)));
692
693         /*
694          * XXX Client applications probably store the current database
695          * somewhere, so renaming it could cause confusion.  On the other
696          * hand, there may not be an actual problem besides a little
697          * confusion, so think about this and decide.
698          */
699         if (HeapTupleGetOid(tup) == MyDatabaseId)
700                 ereport(ERROR,
701                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
702                                  errmsg("current database may not be renamed")));
703
704         /*
705          * Make sure the database does not have active sessions.  Might not be
706          * necessary, but it's consistent with other database operations.
707          */
708         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
709                 ereport(ERROR,
710                                 (errcode(ERRCODE_OBJECT_IN_USE),
711                            errmsg("database \"%s\" is being accessed by other users",
712                                           oldname)));
713
714         /* make sure the new name doesn't exist */
715         ScanKeyInit(&key2,
716                                 Anum_pg_database_datname,
717                                 BTEqualStrategyNumber, F_NAMEEQ,
718                                 NameGetDatum(newname));
719         scan2 = systable_beginscan(rel, DatabaseNameIndexId, true,
720                                                            SnapshotNow, 1, &key2);
721         if (HeapTupleIsValid(systable_getnext(scan2)))
722                 ereport(ERROR,
723                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
724                                  errmsg("database \"%s\" already exists", newname)));
725         systable_endscan(scan2);
726
727         /* must be owner */
728         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
729                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
730                                            oldname);
731
732         /* must have createdb rights */
733         if (!superuser() && !have_createdb_privilege())
734                 ereport(ERROR,
735                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
736                                  errmsg("permission denied to rename database")));
737
738         /* rename */
739         newtup = heap_copytuple(tup);
740         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
741         simple_heap_update(rel, &newtup->t_self, newtup);
742         CatalogUpdateIndexes(rel, newtup);
743
744         systable_endscan(scan);
745
746         /* Close pg_database, but keep exclusive lock till commit */
747         heap_close(rel, NoLock);
748
749         /*
750          * Set flag to update flat database file at commit.
751          */
752         database_file_update_needed();
753 }
754
755
756 /*
757  * ALTER DATABASE name SET ...
758  */
759 void
760 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
761 {
762         char       *valuestr;
763         HeapTuple       tuple,
764                                 newtuple;
765         Relation        rel;
766         ScanKeyData scankey;
767         SysScanDesc scan;
768         Datum           repl_val[Natts_pg_database];
769         char            repl_null[Natts_pg_database];
770         char            repl_repl[Natts_pg_database];
771
772         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
773
774         /*
775          * We don't need ExclusiveLock since we aren't updating the
776          * flat file.
777          */
778         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
779         ScanKeyInit(&scankey,
780                                 Anum_pg_database_datname,
781                                 BTEqualStrategyNumber, F_NAMEEQ,
782                                 NameGetDatum(stmt->dbname));
783         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
784                                                           SnapshotNow, 1, &scankey);
785         tuple = systable_getnext(scan);
786         if (!HeapTupleIsValid(tuple))
787                 ereport(ERROR,
788                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
789                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
790
791         if (!(superuser()
792                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
793                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
794                                            stmt->dbname);
795
796         MemSet(repl_repl, ' ', sizeof(repl_repl));
797         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
798
799         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
800         {
801                 /* RESET ALL */
802                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
803                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
804         }
805         else
806         {
807                 Datum           datum;
808                 bool            isnull;
809                 ArrayType  *a;
810
811                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
812
813                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
814                                                          RelationGetDescr(rel), &isnull);
815
816                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
817
818                 if (valuestr)
819                         a = GUCArrayAdd(a, stmt->variable, valuestr);
820                 else
821                         a = GUCArrayDelete(a, stmt->variable);
822
823                 if (a)
824                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
825                 else
826                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
827         }
828
829         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
830         simple_heap_update(rel, &tuple->t_self, newtuple);
831
832         /* Update indexes */
833         CatalogUpdateIndexes(rel, newtuple);
834
835         systable_endscan(scan);
836
837         /* Close pg_database, but keep lock till commit */
838         heap_close(rel, NoLock);
839
840         /*
841          * We don't bother updating the flat file since ALTER DATABASE SET
842          * doesn't affect it.
843          */
844 }
845
846
847 /*
848  * ALTER DATABASE name OWNER TO newowner
849  */
850 void
851 AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
852 {
853         HeapTuple       tuple;
854         Relation        rel;
855         ScanKeyData scankey;
856         SysScanDesc scan;
857         Form_pg_database datForm;
858
859         /*
860          * We don't need ExclusiveLock since we aren't updating the
861          * flat file.
862          */
863         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
864         ScanKeyInit(&scankey,
865                                 Anum_pg_database_datname,
866                                 BTEqualStrategyNumber, F_NAMEEQ,
867                                 NameGetDatum(dbname));
868         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
869                                                           SnapshotNow, 1, &scankey);
870         tuple = systable_getnext(scan);
871         if (!HeapTupleIsValid(tuple))
872                 ereport(ERROR,
873                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
874                                  errmsg("database \"%s\" does not exist", dbname)));
875
876         datForm = (Form_pg_database) GETSTRUCT(tuple);
877
878         /*
879          * If the new owner is the same as the existing owner, consider the
880          * command to have succeeded.  This is to be consistent with other
881          * objects.
882          */
883         if (datForm->datdba != newOwnerSysId)
884         {
885                 Datum           repl_val[Natts_pg_database];
886                 char            repl_null[Natts_pg_database];
887                 char            repl_repl[Natts_pg_database];
888                 Acl                *newAcl;
889                 Datum           aclDatum;
890                 bool            isNull;
891                 HeapTuple       newtuple;
892
893                 /* must be superuser to change ownership */
894                 if (!superuser())
895                         ereport(ERROR,
896                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
897                                          errmsg("must be superuser to change owner")));
898
899                 memset(repl_null, ' ', sizeof(repl_null));
900                 memset(repl_repl, ' ', sizeof(repl_repl));
901
902                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
903                 repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
904
905                 /*
906                  * Determine the modified ACL for the new owner.  This is only
907                  * necessary when the ACL is non-null.
908                  */
909                 aclDatum = heap_getattr(tuple,
910                                                                 Anum_pg_database_datacl,
911                                                                 RelationGetDescr(rel),
912                                                                 &isNull);
913                 if (!isNull)
914                 {
915                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
916                                                                  datForm->datdba, newOwnerSysId);
917                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
918                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
919                 }
920
921                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
922                 simple_heap_update(rel, &newtuple->t_self, newtuple);
923                 CatalogUpdateIndexes(rel, newtuple);
924
925                 heap_freetuple(newtuple);
926         }
927
928         systable_endscan(scan);
929
930         /* Close pg_database, but keep lock till commit */
931         heap_close(rel, NoLock);
932
933         /*
934          * We don't bother updating the flat file since ALTER DATABASE OWNER
935          * doesn't affect it.
936          */
937 }
938
939
940 /*
941  * Helper functions
942  */
943
944 static bool
945 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
946                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
947                         Oid *dbLastSysOidP,
948                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
949                         Oid *dbTablespace)
950 {
951         Relation        relation;
952         ScanKeyData scanKey;
953         SysScanDesc scan;
954         HeapTuple       tuple;
955         bool            gottuple;
956
957         AssertArg(name);
958
959         /* Caller may wish to grab a better lock on pg_database beforehand... */
960         relation = heap_open(DatabaseRelationId, AccessShareLock);
961
962         ScanKeyInit(&scanKey,
963                                 Anum_pg_database_datname,
964                                 BTEqualStrategyNumber, F_NAMEEQ,
965                                 NameGetDatum(name));
966
967         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
968                                                           SnapshotNow, 1, &scanKey);
969
970         tuple = systable_getnext(scan);
971
972         gottuple = HeapTupleIsValid(tuple);
973         if (gottuple)
974         {
975                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
976
977                 /* oid of the database */
978                 if (dbIdP)
979                         *dbIdP = HeapTupleGetOid(tuple);
980                 /* sysid of the owner */
981                 if (ownerIdP)
982                         *ownerIdP = dbform->datdba;
983                 /* character encoding */
984                 if (encodingP)
985                         *encodingP = dbform->encoding;
986                 /* allowed as template? */
987                 if (dbIsTemplateP)
988                         *dbIsTemplateP = dbform->datistemplate;
989                 /* allowing connections? */
990                 if (dbAllowConnP)
991                         *dbAllowConnP = dbform->datallowconn;
992                 /* last system OID used in database */
993                 if (dbLastSysOidP)
994                         *dbLastSysOidP = dbform->datlastsysoid;
995                 /* limit of vacuumed XIDs */
996                 if (dbVacuumXidP)
997                         *dbVacuumXidP = dbform->datvacuumxid;
998                 /* limit of frozen XIDs */
999                 if (dbFrozenXidP)
1000                         *dbFrozenXidP = dbform->datfrozenxid;
1001                 /* default tablespace for this database */
1002                 if (dbTablespace)
1003                         *dbTablespace = dbform->dattablespace;
1004         }
1005
1006         systable_endscan(scan);
1007         heap_close(relation, AccessShareLock);
1008
1009         return gottuple;
1010 }
1011
1012 /* Check if current user has createdb privileges */
1013 static bool
1014 have_createdb_privilege(void)
1015 {
1016         bool            result = false;
1017         HeapTuple       utup;
1018
1019         utup = SearchSysCache(SHADOWSYSID,
1020                                                   Int32GetDatum(GetUserId()),
1021                                                   0, 0, 0);
1022         if (HeapTupleIsValid(utup))
1023         {
1024                 result = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
1025                 ReleaseSysCache(utup);
1026         }
1027         return result;
1028 }
1029
1030 /*
1031  * Remove tablespace directories
1032  *
1033  * We don't know what tablespaces db_id is using, so iterate through all
1034  * tablespaces removing <tablespace>/db_id
1035  */
1036 static void
1037 remove_dbtablespaces(Oid db_id)
1038 {
1039         Relation        rel;
1040         HeapScanDesc scan;
1041         HeapTuple       tuple;
1042
1043         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1044         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1045         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1046         {
1047                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1048                 char       *dstpath;
1049                 struct stat st;
1050
1051                 /* Don't mess with the global tablespace */
1052                 if (dsttablespace == GLOBALTABLESPACE_OID)
1053                         continue;
1054
1055                 dstpath = GetDatabasePath(db_id, dsttablespace);
1056
1057                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1058                 {
1059                         /* Assume we can ignore it */
1060                         pfree(dstpath);
1061                         continue;
1062                 }
1063
1064                 if (!rmtree(dstpath, true))
1065                         ereport(WARNING,
1066                                         (errmsg("could not remove database directory \"%s\"",
1067                                                         dstpath)));
1068
1069                 /* Record the filesystem change in XLOG */
1070                 {
1071                         xl_dbase_drop_rec xlrec;
1072                         XLogRecData rdata[1];
1073
1074                         xlrec.db_id = db_id;
1075                         xlrec.tablespace_id = dsttablespace;
1076
1077                         rdata[0].buffer = InvalidBuffer;
1078                         rdata[0].data = (char *) &xlrec;
1079                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1080                         rdata[0].next = NULL;
1081
1082                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1083                 }
1084
1085                 pfree(dstpath);
1086         }
1087
1088         heap_endscan(scan);
1089         heap_close(rel, AccessShareLock);
1090 }
1091
1092
1093 /*
1094  * get_database_oid - given a database name, look up the OID
1095  *
1096  * Returns InvalidOid if database name not found.
1097  *
1098  * This is not actually used in this file, but is exported for use elsewhere.
1099  */
1100 Oid
1101 get_database_oid(const char *dbname)
1102 {
1103         Relation        pg_database;
1104         ScanKeyData entry[1];
1105         SysScanDesc scan;
1106         HeapTuple       dbtuple;
1107         Oid                     oid;
1108
1109         /* There's no syscache for pg_database, so must look the hard way */
1110         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1111         ScanKeyInit(&entry[0],
1112                                 Anum_pg_database_datname,
1113                                 BTEqualStrategyNumber, F_NAMEEQ,
1114                                 CStringGetDatum(dbname));
1115         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1116                                                           SnapshotNow, 1, entry);
1117
1118         dbtuple = systable_getnext(scan);
1119
1120         /* We assume that there can be at most one matching tuple */
1121         if (HeapTupleIsValid(dbtuple))
1122                 oid = HeapTupleGetOid(dbtuple);
1123         else
1124                 oid = InvalidOid;
1125
1126         systable_endscan(scan);
1127         heap_close(pg_database, AccessShareLock);
1128
1129         return oid;
1130 }
1131
1132
1133 /*
1134  * get_database_name - given a database OID, look up the name
1135  *
1136  * Returns a palloc'd string, or NULL if no such database.
1137  *
1138  * This is not actually used in this file, but is exported for use elsewhere.
1139  */
1140 char *
1141 get_database_name(Oid dbid)
1142 {
1143         Relation        pg_database;
1144         ScanKeyData entry[1];
1145         SysScanDesc scan;
1146         HeapTuple       dbtuple;
1147         char       *result;
1148
1149         /* There's no syscache for pg_database, so must look the hard way */
1150         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1151         ScanKeyInit(&entry[0],
1152                                 ObjectIdAttributeNumber,
1153                                 BTEqualStrategyNumber, F_OIDEQ,
1154                                 ObjectIdGetDatum(dbid));
1155         scan = systable_beginscan(pg_database, DatabaseOidIndexId, true,
1156                                                           SnapshotNow, 1, entry);
1157
1158         dbtuple = systable_getnext(scan);
1159
1160         /* We assume that there can be at most one matching tuple */
1161         if (HeapTupleIsValid(dbtuple))
1162                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1163         else
1164                 result = NULL;
1165
1166         systable_endscan(scan);
1167         heap_close(pg_database, AccessShareLock);
1168
1169         return result;
1170 }
1171
1172 /*
1173  * DATABASE resource manager's routines
1174  */
1175 void
1176 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1177 {
1178         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1179
1180         if (info == XLOG_DBASE_CREATE)
1181         {
1182                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1183                 char       *src_path;
1184                 char       *dst_path;
1185                 struct stat st;
1186
1187 #ifndef WIN32
1188                 char            buf[2 * MAXPGPATH + 100];
1189 #endif
1190
1191                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1192                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1193
1194                 /*
1195                  * Our theory for replaying a CREATE is to forcibly drop the
1196                  * target subdirectory if present, then re-copy the source data.
1197                  * This may be more work than needed, but it is simple to
1198                  * implement.
1199                  */
1200                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1201                 {
1202                         if (!rmtree(dst_path, true))
1203                                 ereport(WARNING,
1204                                         (errmsg("could not remove database directory \"%s\"",
1205                                                         dst_path)));
1206                 }
1207
1208                 /*
1209                  * Force dirty buffers out to disk, to ensure source database is
1210                  * up-to-date for the copy.  (We really only need to flush buffers for
1211                  * the source database, but bufmgr.c provides no API for that.)
1212                  */
1213                 BufferSync();
1214
1215 #ifndef WIN32
1216
1217                 /*
1218                  * Copy this subdirectory to the new location
1219                  *
1220                  * XXX use of cp really makes this code pretty grotty, particularly
1221                  * with respect to lack of ability to report errors well.  Someday
1222                  * rewrite to do it for ourselves.
1223                  */
1224
1225                 /* We might need to use cp -R one day for portability */
1226                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1227                                  src_path, dst_path);
1228                 if (system(buf) != 0)
1229                         ereport(ERROR,
1230                                         (errmsg("could not initialize database directory"),
1231                                          errdetail("Failing system command was: %s", buf),
1232                                          errhint("Look in the postmaster's stderr log for more information.")));
1233 #else                                                   /* WIN32 */
1234                 if (copydir(src_path, dst_path) != 0)
1235                 {
1236                         /* copydir should already have given details of its troubles */
1237                         ereport(ERROR,
1238                                         (errmsg("could not initialize database directory")));
1239                 }
1240 #endif   /* WIN32 */
1241         }
1242         else if (info == XLOG_DBASE_DROP)
1243         {
1244                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1245                 char       *dst_path;
1246
1247                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1248
1249                 /*
1250                  * Drop pages for this database that are in the shared buffer
1251                  * cache
1252                  */
1253                 DropBuffers(xlrec->db_id);
1254
1255                 if (!rmtree(dst_path, true))
1256                         ereport(WARNING,
1257                                         (errmsg("could not remove database directory \"%s\"",
1258                                                         dst_path)));
1259         }
1260         else if (info == XLOG_DBASE_CREATE_OLD)
1261         {
1262                 xl_dbase_create_rec_old *xlrec = (xl_dbase_create_rec_old *) XLogRecGetData(record);
1263                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1264                 struct stat st;
1265
1266 #ifndef WIN32
1267                 char            buf[2 * MAXPGPATH + 100];
1268 #endif
1269
1270                 /*
1271                  * Our theory for replaying a CREATE is to forcibly drop the
1272                  * target subdirectory if present, then re-copy the source data.
1273                  * This may be more work than needed, but it is simple to
1274                  * implement.
1275                  */
1276                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1277                 {
1278                         if (!rmtree(dst_path, true))
1279                                 ereport(WARNING,
1280                                         (errmsg("could not remove database directory \"%s\"",
1281                                                         dst_path)));
1282                 }
1283
1284                 /*
1285                  * Force dirty buffers out to disk, to ensure source database is
1286                  * up-to-date for the copy.  (We really only need to flush buffers for
1287                  * the source database, but bufmgr.c provides no API for that.)
1288                  */
1289                 BufferSync();
1290
1291 #ifndef WIN32
1292
1293                 /*
1294                  * Copy this subdirectory to the new location
1295                  *
1296                  * XXX use of cp really makes this code pretty grotty, particularly
1297                  * with respect to lack of ability to report errors well.  Someday
1298                  * rewrite to do it for ourselves.
1299                  */
1300
1301                 /* We might need to use cp -R one day for portability */
1302                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1303                                  xlrec->src_path, dst_path);
1304                 if (system(buf) != 0)
1305                         ereport(ERROR,
1306                                         (errmsg("could not initialize database directory"),
1307                                          errdetail("Failing system command was: %s", buf),
1308                                          errhint("Look in the postmaster's stderr log for more information.")));
1309 #else                                                   /* WIN32 */
1310                 if (copydir(xlrec->src_path, dst_path) != 0)
1311                 {
1312                         /* copydir should already have given details of its troubles */
1313                         ereport(ERROR,
1314                                         (errmsg("could not initialize database directory")));
1315                 }
1316 #endif   /* WIN32 */
1317         }
1318         else if (info == XLOG_DBASE_DROP_OLD)
1319         {
1320                 xl_dbase_drop_rec_old *xlrec = (xl_dbase_drop_rec_old *) XLogRecGetData(record);
1321
1322                 /*
1323                  * Drop pages for this database that are in the shared buffer
1324                  * cache
1325                  */
1326                 DropBuffers(xlrec->db_id);
1327
1328                 if (!rmtree(xlrec->dir_path, true))
1329                         ereport(WARNING,
1330                                         (errmsg("could not remove database directory \"%s\"",
1331                                                         xlrec->dir_path)));
1332         }
1333         else
1334                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1335 }
1336
1337 void
1338 dbase_undo(XLogRecPtr lsn, XLogRecord *record)
1339 {
1340         elog(PANIC, "dbase_undo: unimplemented");
1341 }
1342
1343 void
1344 dbase_desc(char *buf, uint8 xl_info, char *rec)
1345 {
1346         uint8           info = xl_info & ~XLR_INFO_MASK;
1347
1348         if (info == XLOG_DBASE_CREATE)
1349         {
1350                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1351
1352                 sprintf(buf + strlen(buf), "create db: copy dir %u/%u to %u/%u",
1353                                 xlrec->src_db_id, xlrec->src_tablespace_id,
1354                                 xlrec->db_id, xlrec->tablespace_id);
1355         }
1356         else if (info == XLOG_DBASE_DROP)
1357         {
1358                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1359
1360                 sprintf(buf + strlen(buf), "drop db: dir %u/%u",
1361                                 xlrec->db_id, xlrec->tablespace_id);
1362         }
1363         else if (info == XLOG_DBASE_CREATE_OLD)
1364         {
1365                 xl_dbase_create_rec_old *xlrec = (xl_dbase_create_rec_old *) rec;
1366                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1367
1368                 sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
1369                                 xlrec->db_id, xlrec->src_path, dst_path);
1370         }
1371         else if (info == XLOG_DBASE_DROP_OLD)
1372         {
1373                 xl_dbase_drop_rec_old *xlrec = (xl_dbase_drop_rec_old *) rec;
1374
1375                 sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
1376                                 xlrec->db_id, xlrec->dir_path);
1377         }
1378         else
1379                 strcat(buf, "UNKNOWN");
1380 }