]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Force a checkpoint before committing a CREATE DATABASE command. This
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands take ExclusiveLock on
7  * pg_database to ensure that no two proceed in parallel.  We must use
8  * at least this level of locking to ensure that no two backends try to
9  * write the flat-file copy of pg_database at once.  We avoid using
10  * AccessExclusiveLock since there's no need to lock out ordinary readers
11  * of pg_database.
12  *
13  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.161 2005/06/25 22:47:29 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 #include "access/genam.h"
29 #include "access/heapam.h"
30 #include "catalog/catalog.h"
31 #include "catalog/pg_database.h"
32 #include "catalog/pg_shadow.h"
33 #include "catalog/pg_tablespace.h"
34 #include "catalog/indexing.h"
35 #include "commands/comment.h"
36 #include "commands/dbcommands.h"
37 #include "commands/tablespace.h"
38 #include "mb/pg_wchar.h"
39 #include "miscadmin.h"
40 #include "postmaster/bgwriter.h"
41 #include "storage/fd.h"
42 #include "storage/freespace.h"
43 #include "storage/procarray.h"
44 #include "utils/acl.h"
45 #include "utils/array.h"
46 #include "utils/builtins.h"
47 #include "utils/flatfiles.h"
48 #include "utils/fmgroids.h"
49 #include "utils/guc.h"
50 #include "utils/lsyscache.h"
51 #include "utils/syscache.h"
52
53
54 /* non-export function prototypes */
55 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
56                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
57                         Oid *dbLastSysOidP,
58                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
59                         Oid *dbTablespace);
60 static bool have_createdb_privilege(void);
61 static void remove_dbtablespaces(Oid db_id);
62
63
64 /*
65  * CREATE DATABASE
66  */
67 void
68 createdb(const CreatedbStmt *stmt)
69 {
70         HeapScanDesc scan;
71         Relation        rel;
72         Oid                     src_dboid;
73         AclId           src_owner;
74         int                     src_encoding;
75         bool            src_istemplate;
76         bool            src_allowconn;
77         Oid                     src_lastsysoid;
78         TransactionId src_vacuumxid;
79         TransactionId src_frozenxid;
80         Oid                     src_deftablespace;
81         Oid                     dst_deftablespace;
82         Relation        pg_database_rel;
83         HeapTuple       tuple;
84         TupleDesc       pg_database_dsc;
85         Datum           new_record[Natts_pg_database];
86         char            new_record_nulls[Natts_pg_database];
87         Oid                     dboid;
88         AclId           datdba;
89         ListCell   *option;
90         DefElem    *dtablespacename = NULL;
91         DefElem    *downer = NULL;
92         DefElem    *dtemplate = NULL;
93         DefElem    *dencoding = NULL;
94         char       *dbname = stmt->dbname;
95         char       *dbowner = NULL;
96         const char *dbtemplate = NULL;
97         int                     encoding = -1;
98
99 #ifndef WIN32
100         char            buf[2 * MAXPGPATH + 100];
101 #endif
102
103         /* don't call this in a transaction block */
104         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
105
106         /* Extract options from the statement node tree */
107         foreach(option, stmt->options)
108         {
109                 DefElem    *defel = (DefElem *) lfirst(option);
110
111                 if (strcmp(defel->defname, "tablespace") == 0)
112                 {
113                         if (dtablespacename)
114                                 ereport(ERROR,
115                                                 (errcode(ERRCODE_SYNTAX_ERROR),
116                                                  errmsg("conflicting or redundant options")));
117                         dtablespacename = defel;
118                 }
119                 else if (strcmp(defel->defname, "owner") == 0)
120                 {
121                         if (downer)
122                                 ereport(ERROR,
123                                                 (errcode(ERRCODE_SYNTAX_ERROR),
124                                                  errmsg("conflicting or redundant options")));
125                         downer = defel;
126                 }
127                 else if (strcmp(defel->defname, "template") == 0)
128                 {
129                         if (dtemplate)
130                                 ereport(ERROR,
131                                                 (errcode(ERRCODE_SYNTAX_ERROR),
132                                                  errmsg("conflicting or redundant options")));
133                         dtemplate = defel;
134                 }
135                 else if (strcmp(defel->defname, "encoding") == 0)
136                 {
137                         if (dencoding)
138                                 ereport(ERROR,
139                                                 (errcode(ERRCODE_SYNTAX_ERROR),
140                                                  errmsg("conflicting or redundant options")));
141                         dencoding = defel;
142                 }
143                 else if (strcmp(defel->defname, "location") == 0)
144                 {
145                         ereport(WARNING,
146                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
147                                          errmsg("LOCATION is not supported anymore"),
148                                          errhint("Consider using tablespaces instead.")));
149                 }
150                 else
151                         elog(ERROR, "option \"%s\" not recognized",
152                                  defel->defname);
153         }
154
155         if (downer && downer->arg)
156                 dbowner = strVal(downer->arg);
157         if (dtemplate && dtemplate->arg)
158                 dbtemplate = strVal(dtemplate->arg);
159         if (dencoding && dencoding->arg)
160         {
161                 const char *encoding_name;
162
163                 if (IsA(dencoding->arg, Integer))
164                 {
165                         encoding = intVal(dencoding->arg);
166                         encoding_name = pg_encoding_to_char(encoding);
167                         if (strcmp(encoding_name, "") == 0 ||
168                                 pg_valid_server_encoding(encoding_name) < 0)
169                                 ereport(ERROR,
170                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
171                                                  errmsg("%d is not a valid encoding code",
172                                                                 encoding)));
173                 }
174                 else if (IsA(dencoding->arg, String))
175                 {
176                         encoding_name = strVal(dencoding->arg);
177                         if (pg_valid_server_encoding(encoding_name) < 0)
178                                 ereport(ERROR,
179                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
180                                                  errmsg("%s is not a valid encoding name",
181                                                                 encoding_name)));
182                         encoding = pg_char_to_encoding(encoding_name);
183                 }
184                 else
185                         elog(ERROR, "unrecognized node type: %d",
186                                  nodeTag(dencoding->arg));
187         }
188
189         /* obtain sysid of proposed owner */
190         if (dbowner)
191                 datdba = get_usesysid(dbowner); /* will ereport if no such user */
192         else
193                 datdba = GetUserId();
194
195         if (datdba == GetUserId())
196         {
197                 /* creating database for self: can be superuser or createdb */
198                 if (!superuser() && !have_createdb_privilege())
199                         ereport(ERROR,
200                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
201                                          errmsg("permission denied to create database")));
202         }
203         else
204         {
205                 /* creating database for someone else: must be superuser */
206                 /* note that the someone else need not have any permissions */
207                 if (!superuser())
208                         ereport(ERROR,
209                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
210                                          errmsg("must be superuser to create database for another user")));
211         }
212
213         /*
214          * Check for db name conflict.  There is a race condition here, since
215          * another backend could create the same DB name before we commit.
216          * However, holding an exclusive lock on pg_database for the whole
217          * time we are copying the source database doesn't seem like a good
218          * idea, so accept possibility of race to create.  We will check again
219          * after we grab the exclusive lock.
220          */
221         if (get_db_info(dbname, NULL, NULL, NULL,
222                                         NULL, NULL, NULL, NULL, NULL, NULL))
223                 ereport(ERROR,
224                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
225                                  errmsg("database \"%s\" already exists", dbname)));
226
227         /*
228          * Lookup database (template) to be cloned.
229          */
230         if (!dbtemplate)
231                 dbtemplate = "template1";               /* Default template database name */
232
233         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
234                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
235                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
236                 ereport(ERROR,
237                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
238                  errmsg("template database \"%s\" does not exist", dbtemplate)));
239
240         /*
241          * Permission check: to copy a DB that's not marked datistemplate, you
242          * must be superuser or the owner thereof.
243          */
244         if (!src_istemplate)
245         {
246                 if (!superuser() && GetUserId() != src_owner)
247                         ereport(ERROR,
248                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
249                                          errmsg("permission denied to copy database \"%s\"",
250                                                         dbtemplate)));
251         }
252
253         /*
254          * The source DB can't have any active backends, except this one
255          * (exception is to allow CREATE DB while connected to template1).
256          * Otherwise we might copy inconsistent data.  This check is not
257          * bulletproof, since someone might connect while we are copying...
258          */
259         if (DatabaseHasActiveBackends(src_dboid, true))
260                 ereport(ERROR,
261                                 (errcode(ERRCODE_OBJECT_IN_USE),
262                 errmsg("source database \"%s\" is being accessed by other users",
263                            dbtemplate)));
264
265         /* If encoding is defaulted, use source's encoding */
266         if (encoding < 0)
267                 encoding = src_encoding;
268
269         /* Some encodings are client only */
270         if (!PG_VALID_BE_ENCODING(encoding))
271                 ereport(ERROR,
272                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
273                                  errmsg("invalid server encoding %d", encoding)));
274
275         /* Resolve default tablespace for new database */
276         if (dtablespacename && dtablespacename->arg)
277         {
278                 char       *tablespacename;
279                 AclResult       aclresult;
280
281                 tablespacename = strVal(dtablespacename->arg);
282                 dst_deftablespace = get_tablespace_oid(tablespacename);
283                 if (!OidIsValid(dst_deftablespace))
284                         ereport(ERROR,
285                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
286                                          errmsg("tablespace \"%s\" does not exist",
287                                                         tablespacename)));
288                 /* check permissions */
289                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
290                                                                                    ACL_CREATE);
291                 if (aclresult != ACLCHECK_OK)
292                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
293                                                    tablespacename);
294
295                 /*
296                  * If we are trying to change the default tablespace of the template,
297                  * we require that the template not have any files in the new default
298                  * tablespace.  This is necessary because otherwise the copied
299                  * database would contain pg_class rows that refer to its default
300                  * tablespace both explicitly (by OID) and implicitly (as zero), which
301                  * would cause problems.  For example another CREATE DATABASE using
302                  * the copied database as template, and trying to change its default
303                  * tablespace again, would yield outright incorrect results (it would
304                  * improperly move tables to the new default tablespace that should
305                  * stay in the same tablespace).
306                  */
307                 if (dst_deftablespace != src_deftablespace)
308                 {
309                         char       *srcpath;
310                         struct stat st;
311
312                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
313
314                         if (stat(srcpath, &st) == 0 &&
315                                 S_ISDIR(st.st_mode) &&
316                                 !directory_is_empty(srcpath))
317                                 ereport(ERROR,
318                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
319                                                  errmsg("cannot assign new default tablespace \"%s\"",
320                                                                 tablespacename),
321                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
322                                                                    dbtemplate)));
323                         pfree(srcpath);
324                 }
325         }
326         else
327         {
328                 /* Use template database's default tablespace */
329                 dst_deftablespace = src_deftablespace;
330                 /* Note there is no additional permission check in this path */
331         }
332
333         /*
334          * Normally we mark the new database with the same datvacuumxid and
335          * datfrozenxid as the source.  However, if the source is not allowing
336          * connections then we assume it is fully frozen, and we can set the
337          * current transaction ID as the xid limits.  This avoids immediately
338          * starting to generate warnings after cloning template0.
339          */
340         if (!src_allowconn)
341                 src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
342
343         /*
344          * Preassign OID for pg_database tuple, so that we can compute db
345          * path.
346          */
347         dboid = newoid();
348
349         /*
350          * Force dirty buffers out to disk, to ensure source database is
351          * up-to-date for the copy.  (We really only need to flush buffers for
352          * the source database, but bufmgr.c provides no API for that.)
353          */
354         BufferSync();
355
356         /*
357          * Close virtual file descriptors so the kernel has more available for
358          * the system() calls below.
359          */
360         closeAllVfds();
361
362         /*
363          * Iterate through all tablespaces of the template database, and copy
364          * each one to the new database.
365          */
366         rel = heap_open(TableSpaceRelationId, AccessShareLock);
367         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
368         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
369         {
370                 Oid                     srctablespace = HeapTupleGetOid(tuple);
371                 Oid                     dsttablespace;
372                 char       *srcpath;
373                 char       *dstpath;
374                 struct stat st;
375
376                 /* No need to copy global tablespace */
377                 if (srctablespace == GLOBALTABLESPACE_OID)
378                         continue;
379
380                 srcpath = GetDatabasePath(src_dboid, srctablespace);
381
382                 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
383                         directory_is_empty(srcpath))
384                 {
385                         /* Assume we can ignore it */
386                         pfree(srcpath);
387                         continue;
388                 }
389
390                 if (srctablespace == src_deftablespace)
391                         dsttablespace = dst_deftablespace;
392                 else
393                         dsttablespace = srctablespace;
394
395                 dstpath = GetDatabasePath(dboid, dsttablespace);
396
397                 if (stat(dstpath, &st) == 0 || errno != ENOENT)
398                 {
399                         remove_dbtablespaces(dboid);
400                         ereport(ERROR,
401                                         (errmsg("could not initialize database directory"),
402                                          errdetail("Directory \"%s\" already exists.",
403                                                            dstpath)));
404                 }
405
406 #ifndef WIN32
407
408                 /*
409                  * Copy this subdirectory to the new location
410                  *
411                  * XXX use of cp really makes this code pretty grotty, particularly
412                  * with respect to lack of ability to report errors well.  Someday
413                  * rewrite to do it for ourselves.
414                  */
415
416                 /* We might need to use cp -R one day for portability */
417                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
418                                  srcpath, dstpath);
419                 if (system(buf) != 0)
420                 {
421                         remove_dbtablespaces(dboid);
422                         ereport(ERROR,
423                                         (errmsg("could not initialize database directory"),
424                                          errdetail("Failing system command was: %s", buf),
425                                          errhint("Look in the postmaster's stderr log for more information.")));
426                 }
427 #else                                                   /* WIN32 */
428                 if (copydir(srcpath, dstpath) != 0)
429                 {
430                         /* copydir should already have given details of its troubles */
431                         remove_dbtablespaces(dboid);
432                         ereport(ERROR,
433                                         (errmsg("could not initialize database directory")));
434                 }
435 #endif   /* WIN32 */
436
437                 /* Record the filesystem change in XLOG */
438                 {
439                         xl_dbase_create_rec xlrec;
440                         XLogRecData rdata[1];
441
442                         xlrec.db_id = dboid;
443                         xlrec.tablespace_id = dsttablespace;
444                         xlrec.src_db_id = src_dboid;
445                         xlrec.src_tablespace_id = srctablespace;
446
447                         rdata[0].data = (char *) &xlrec;
448                         rdata[0].len = sizeof(xl_dbase_create_rec);
449                         rdata[0].buffer = InvalidBuffer;
450                         rdata[0].next = NULL;
451
452                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
453                 }
454         }
455         heap_endscan(scan);
456         heap_close(rel, AccessShareLock);
457
458         /*
459          * Now OK to grab exclusive lock on pg_database.
460          */
461         pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);
462
463         /* Check to see if someone else created same DB name meanwhile. */
464         if (get_db_info(dbname, NULL, NULL, NULL,
465                                         NULL, NULL, NULL, NULL, NULL, NULL))
466         {
467                 /* Don't hold lock while doing recursive remove */
468                 heap_close(pg_database_rel, ExclusiveLock);
469                 remove_dbtablespaces(dboid);
470                 ereport(ERROR,
471                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
472                                  errmsg("database \"%s\" already exists", dbname)));
473         }
474
475         /*
476          * Insert a new tuple into pg_database
477          */
478         pg_database_dsc = RelationGetDescr(pg_database_rel);
479
480         /* Form tuple */
481         MemSet(new_record, 0, sizeof(new_record));
482         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
483
484         new_record[Anum_pg_database_datname - 1] =
485                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
486         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
487         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
488         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
489         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
490         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
491         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
492         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
493         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
494
495         /*
496          * We deliberately set datconfig and datacl to defaults (NULL), rather
497          * than copying them from the template database.  Copying datacl would
498          * be a bad idea when the owner is not the same as the template's
499          * owner. It's more debatable whether datconfig should be copied.
500          */
501         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
502         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
503
504         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
505
506         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
507                                                                                  * selection */
508
509         simple_heap_insert(pg_database_rel, tuple);
510
511         /* Update indexes */
512         CatalogUpdateIndexes(pg_database_rel, tuple);
513
514         /* Close pg_database, but keep exclusive lock till commit */
515         heap_close(pg_database_rel, NoLock);
516
517         /*
518          * We force a checkpoint before committing.  This effectively means
519          * that committed XLOG_DBASE_CREATE operations will never need to be
520          * replayed (at least not in ordinary crash recovery; we still have
521          * to make the XLOG entry for the benefit of PITR operations).
522          * This avoids two nasty scenarios:
523          *
524          * #1: When PITR is off, we don't XLOG the contents of newly created
525          * indexes; therefore the drop-and-recreate-whole-directory behavior
526          * of DBASE_CREATE replay would lose such indexes.
527          *
528          * #2: Since we have to recopy the source database during DBASE_CREATE
529          * replay, we run the risk of copying changes in it that were committed
530          * after the original CREATE DATABASE command but before the system
531          * crash that led to the replay.  This is at least unexpected and at
532          * worst could lead to inconsistencies, eg duplicate table names.
533          *
534          * (Both of these were real bugs in releases 8.0 through 8.0.3.)
535          *
536          * In PITR replay, the first of these isn't an issue, and the second
537          * is only a risk if the CREATE DATABASE and subsequent template
538          * database change both occur while a base backup is being taken.
539          * There doesn't seem to be much we can do about that except document
540          * it as a limitation.
541          *
542          * Perhaps if we ever implement CREATE DATABASE in a less cheesy
543          * way, we can avoid this.
544          */
545         RequestCheckpoint(true);
546
547         /*
548          * Set flag to update flat database file at commit.
549          */
550         database_file_update_needed();
551 }
552
553
554 /*
555  * DROP DATABASE
556  */
557 void
558 dropdb(const char *dbname)
559 {
560         int4            db_owner;
561         bool            db_istemplate;
562         Oid                     db_id;
563         Relation        pgdbrel;
564         SysScanDesc pgdbscan;
565         ScanKeyData key;
566         HeapTuple       tup;
567
568         PreventTransactionChain((void *) dbname, "DROP DATABASE");
569
570         AssertArg(dbname);
571
572         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
573                 ereport(ERROR,
574                                 (errcode(ERRCODE_OBJECT_IN_USE),
575                                  errmsg("cannot drop the currently open database")));
576
577         /*
578          * Obtain exclusive lock on pg_database.  We need this to ensure that
579          * no new backend starts up in the target database while we are
580          * deleting it.  (Actually, a new backend might still manage to start
581          * up, because it isn't able to lock pg_database while starting.  But
582          * it will detect its error in ReverifyMyDatabase and shut down before
583          * any serious damage is done.  See postinit.c.)
584          *
585          * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient
586          * since ReverifyMyDatabase takes RowShareLock.  This allows ordinary
587          * readers of pg_database to proceed in parallel.
588          */
589         pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
590
591         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
592                                          &db_istemplate, NULL, NULL, NULL, NULL, NULL))
593                 ereport(ERROR,
594                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
595                                  errmsg("database \"%s\" does not exist", dbname)));
596
597         if (GetUserId() != db_owner && !superuser())
598                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
599                                            dbname);
600
601         /*
602          * Disallow dropping a DB that is marked istemplate.  This is just to
603          * prevent people from accidentally dropping template0 or template1;
604          * they can do so if they're really determined ...
605          */
606         if (db_istemplate)
607                 ereport(ERROR,
608                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
609                                  errmsg("cannot drop a template database")));
610
611         /*
612          * Check for active backends in the target database.
613          */
614         if (DatabaseHasActiveBackends(db_id, false))
615                 ereport(ERROR,
616                                 (errcode(ERRCODE_OBJECT_IN_USE),
617                            errmsg("database \"%s\" is being accessed by other users",
618                                           dbname)));
619
620         /*
621          * Find the database's tuple by OID (should be unique).
622          */
623         ScanKeyInit(&key,
624                                 ObjectIdAttributeNumber,
625                                 BTEqualStrategyNumber, F_OIDEQ,
626                                 ObjectIdGetDatum(db_id));
627
628         pgdbscan = systable_beginscan(pgdbrel, DatabaseOidIndexId, true,
629                                                                   SnapshotNow, 1, &key);
630
631         tup = systable_getnext(pgdbscan);
632         if (!HeapTupleIsValid(tup))
633         {
634                 /*
635                  * This error should never come up since the existence of the
636                  * database is checked earlier
637                  */
638                 elog(ERROR, "database \"%s\" doesn't exist despite earlier reports to the contrary",
639                          dbname);
640         }
641
642         /* Remove the database's tuple from pg_database */
643         simple_heap_delete(pgdbrel, &tup->t_self);
644
645         systable_endscan(pgdbscan);
646
647         /*
648          * Delete any comments associated with the database
649          *
650          * NOTE: this is probably dead code since any such comments should have
651          * been in that database, not mine.
652          */
653         DeleteComments(db_id, DatabaseRelationId, 0);
654
655         /*
656          * Drop pages for this database that are in the shared buffer cache.
657          * This is important to ensure that no remaining backend tries to
658          * write out a dirty buffer to the dead database later...
659          */
660         DropBuffers(db_id);
661
662         /*
663          * Also, clean out any entries in the shared free space map.
664          */
665         FreeSpaceMapForgetDatabase(db_id);
666
667         /*
668          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
669          * open files, which would cause rmdir() to fail.
670          */
671 #ifdef WIN32
672         RequestCheckpoint(true);
673 #endif
674
675         /*
676          * Remove all tablespace subdirs belonging to the database.
677          */
678         remove_dbtablespaces(db_id);
679
680         /* Close pg_database, but keep exclusive lock till commit */
681         heap_close(pgdbrel, NoLock);
682
683         /*
684          * Set flag to update flat database file at commit.
685          */
686         database_file_update_needed();
687 }
688
689
690 /*
691  * Rename database
692  */
693 void
694 RenameDatabase(const char *oldname, const char *newname)
695 {
696         HeapTuple       tup,
697                                 newtup;
698         Relation        rel;
699         SysScanDesc scan,
700                                 scan2;
701         ScanKeyData key,
702                                 key2;
703
704         /*
705          * Obtain ExclusiveLock so that no new session gets started
706          * while the rename is in progress.
707          */
708         rel = heap_open(DatabaseRelationId, ExclusiveLock);
709
710         ScanKeyInit(&key,
711                                 Anum_pg_database_datname,
712                                 BTEqualStrategyNumber, F_NAMEEQ,
713                                 NameGetDatum(oldname));
714         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
715                                                           SnapshotNow, 1, &key);
716
717         tup = systable_getnext(scan);
718         if (!HeapTupleIsValid(tup))
719                 ereport(ERROR,
720                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
721                                  errmsg("database \"%s\" does not exist", oldname)));
722
723         /*
724          * XXX Client applications probably store the current database
725          * somewhere, so renaming it could cause confusion.  On the other
726          * hand, there may not be an actual problem besides a little
727          * confusion, so think about this and decide.
728          */
729         if (HeapTupleGetOid(tup) == MyDatabaseId)
730                 ereport(ERROR,
731                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
732                                  errmsg("current database may not be renamed")));
733
734         /*
735          * Make sure the database does not have active sessions.  Might not be
736          * necessary, but it's consistent with other database operations.
737          */
738         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
739                 ereport(ERROR,
740                                 (errcode(ERRCODE_OBJECT_IN_USE),
741                            errmsg("database \"%s\" is being accessed by other users",
742                                           oldname)));
743
744         /* make sure the new name doesn't exist */
745         ScanKeyInit(&key2,
746                                 Anum_pg_database_datname,
747                                 BTEqualStrategyNumber, F_NAMEEQ,
748                                 NameGetDatum(newname));
749         scan2 = systable_beginscan(rel, DatabaseNameIndexId, true,
750                                                            SnapshotNow, 1, &key2);
751         if (HeapTupleIsValid(systable_getnext(scan2)))
752                 ereport(ERROR,
753                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
754                                  errmsg("database \"%s\" already exists", newname)));
755         systable_endscan(scan2);
756
757         /* must be owner */
758         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
759                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
760                                            oldname);
761
762         /* must have createdb rights */
763         if (!superuser() && !have_createdb_privilege())
764                 ereport(ERROR,
765                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
766                                  errmsg("permission denied to rename database")));
767
768         /* rename */
769         newtup = heap_copytuple(tup);
770         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
771         simple_heap_update(rel, &newtup->t_self, newtup);
772         CatalogUpdateIndexes(rel, newtup);
773
774         systable_endscan(scan);
775
776         /* Close pg_database, but keep exclusive lock till commit */
777         heap_close(rel, NoLock);
778
779         /*
780          * Set flag to update flat database file at commit.
781          */
782         database_file_update_needed();
783 }
784
785
786 /*
787  * ALTER DATABASE name SET ...
788  */
789 void
790 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
791 {
792         char       *valuestr;
793         HeapTuple       tuple,
794                                 newtuple;
795         Relation        rel;
796         ScanKeyData scankey;
797         SysScanDesc scan;
798         Datum           repl_val[Natts_pg_database];
799         char            repl_null[Natts_pg_database];
800         char            repl_repl[Natts_pg_database];
801
802         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
803
804         /*
805          * We don't need ExclusiveLock since we aren't updating the
806          * flat file.
807          */
808         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
809         ScanKeyInit(&scankey,
810                                 Anum_pg_database_datname,
811                                 BTEqualStrategyNumber, F_NAMEEQ,
812                                 NameGetDatum(stmt->dbname));
813         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
814                                                           SnapshotNow, 1, &scankey);
815         tuple = systable_getnext(scan);
816         if (!HeapTupleIsValid(tuple))
817                 ereport(ERROR,
818                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
819                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
820
821         if (!(superuser()
822                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
823                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
824                                            stmt->dbname);
825
826         MemSet(repl_repl, ' ', sizeof(repl_repl));
827         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
828
829         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
830         {
831                 /* RESET ALL */
832                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
833                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
834         }
835         else
836         {
837                 Datum           datum;
838                 bool            isnull;
839                 ArrayType  *a;
840
841                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
842
843                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
844                                                          RelationGetDescr(rel), &isnull);
845
846                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
847
848                 if (valuestr)
849                         a = GUCArrayAdd(a, stmt->variable, valuestr);
850                 else
851                         a = GUCArrayDelete(a, stmt->variable);
852
853                 if (a)
854                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
855                 else
856                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
857         }
858
859         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
860         simple_heap_update(rel, &tuple->t_self, newtuple);
861
862         /* Update indexes */
863         CatalogUpdateIndexes(rel, newtuple);
864
865         systable_endscan(scan);
866
867         /* Close pg_database, but keep lock till commit */
868         heap_close(rel, NoLock);
869
870         /*
871          * We don't bother updating the flat file since ALTER DATABASE SET
872          * doesn't affect it.
873          */
874 }
875
876
877 /*
878  * ALTER DATABASE name OWNER TO newowner
879  */
880 void
881 AlterDatabaseOwner(const char *dbname, AclId newOwnerSysId)
882 {
883         HeapTuple       tuple;
884         Relation        rel;
885         ScanKeyData scankey;
886         SysScanDesc scan;
887         Form_pg_database datForm;
888
889         /*
890          * We don't need ExclusiveLock since we aren't updating the
891          * flat file.
892          */
893         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
894         ScanKeyInit(&scankey,
895                                 Anum_pg_database_datname,
896                                 BTEqualStrategyNumber, F_NAMEEQ,
897                                 NameGetDatum(dbname));
898         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
899                                                           SnapshotNow, 1, &scankey);
900         tuple = systable_getnext(scan);
901         if (!HeapTupleIsValid(tuple))
902                 ereport(ERROR,
903                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
904                                  errmsg("database \"%s\" does not exist", dbname)));
905
906         datForm = (Form_pg_database) GETSTRUCT(tuple);
907
908         /*
909          * If the new owner is the same as the existing owner, consider the
910          * command to have succeeded.  This is to be consistent with other
911          * objects.
912          */
913         if (datForm->datdba != newOwnerSysId)
914         {
915                 Datum           repl_val[Natts_pg_database];
916                 char            repl_null[Natts_pg_database];
917                 char            repl_repl[Natts_pg_database];
918                 Acl                *newAcl;
919                 Datum           aclDatum;
920                 bool            isNull;
921                 HeapTuple       newtuple;
922
923                 /* must be superuser to change ownership */
924                 if (!superuser())
925                         ereport(ERROR,
926                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
927                                          errmsg("must be superuser to change owner")));
928
929                 memset(repl_null, ' ', sizeof(repl_null));
930                 memset(repl_repl, ' ', sizeof(repl_repl));
931
932                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
933                 repl_val[Anum_pg_database_datdba - 1] = Int32GetDatum(newOwnerSysId);
934
935                 /*
936                  * Determine the modified ACL for the new owner.  This is only
937                  * necessary when the ACL is non-null.
938                  */
939                 aclDatum = heap_getattr(tuple,
940                                                                 Anum_pg_database_datacl,
941                                                                 RelationGetDescr(rel),
942                                                                 &isNull);
943                 if (!isNull)
944                 {
945                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
946                                                                  datForm->datdba, newOwnerSysId);
947                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
948                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
949                 }
950
951                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
952                 simple_heap_update(rel, &newtuple->t_self, newtuple);
953                 CatalogUpdateIndexes(rel, newtuple);
954
955                 heap_freetuple(newtuple);
956         }
957
958         systable_endscan(scan);
959
960         /* Close pg_database, but keep lock till commit */
961         heap_close(rel, NoLock);
962
963         /*
964          * We don't bother updating the flat file since ALTER DATABASE OWNER
965          * doesn't affect it.
966          */
967 }
968
969
970 /*
971  * Helper functions
972  */
973
974 static bool
975 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
976                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
977                         Oid *dbLastSysOidP,
978                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
979                         Oid *dbTablespace)
980 {
981         Relation        relation;
982         ScanKeyData scanKey;
983         SysScanDesc scan;
984         HeapTuple       tuple;
985         bool            gottuple;
986
987         AssertArg(name);
988
989         /* Caller may wish to grab a better lock on pg_database beforehand... */
990         relation = heap_open(DatabaseRelationId, AccessShareLock);
991
992         ScanKeyInit(&scanKey,
993                                 Anum_pg_database_datname,
994                                 BTEqualStrategyNumber, F_NAMEEQ,
995                                 NameGetDatum(name));
996
997         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
998                                                           SnapshotNow, 1, &scanKey);
999
1000         tuple = systable_getnext(scan);
1001
1002         gottuple = HeapTupleIsValid(tuple);
1003         if (gottuple)
1004         {
1005                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1006
1007                 /* oid of the database */
1008                 if (dbIdP)
1009                         *dbIdP = HeapTupleGetOid(tuple);
1010                 /* sysid of the owner */
1011                 if (ownerIdP)
1012                         *ownerIdP = dbform->datdba;
1013                 /* character encoding */
1014                 if (encodingP)
1015                         *encodingP = dbform->encoding;
1016                 /* allowed as template? */
1017                 if (dbIsTemplateP)
1018                         *dbIsTemplateP = dbform->datistemplate;
1019                 /* allowing connections? */
1020                 if (dbAllowConnP)
1021                         *dbAllowConnP = dbform->datallowconn;
1022                 /* last system OID used in database */
1023                 if (dbLastSysOidP)
1024                         *dbLastSysOidP = dbform->datlastsysoid;
1025                 /* limit of vacuumed XIDs */
1026                 if (dbVacuumXidP)
1027                         *dbVacuumXidP = dbform->datvacuumxid;
1028                 /* limit of frozen XIDs */
1029                 if (dbFrozenXidP)
1030                         *dbFrozenXidP = dbform->datfrozenxid;
1031                 /* default tablespace for this database */
1032                 if (dbTablespace)
1033                         *dbTablespace = dbform->dattablespace;
1034         }
1035
1036         systable_endscan(scan);
1037         heap_close(relation, AccessShareLock);
1038
1039         return gottuple;
1040 }
1041
1042 /* Check if current user has createdb privileges */
1043 static bool
1044 have_createdb_privilege(void)
1045 {
1046         bool            result = false;
1047         HeapTuple       utup;
1048
1049         utup = SearchSysCache(SHADOWSYSID,
1050                                                   Int32GetDatum(GetUserId()),
1051                                                   0, 0, 0);
1052         if (HeapTupleIsValid(utup))
1053         {
1054                 result = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
1055                 ReleaseSysCache(utup);
1056         }
1057         return result;
1058 }
1059
1060 /*
1061  * Remove tablespace directories
1062  *
1063  * We don't know what tablespaces db_id is using, so iterate through all
1064  * tablespaces removing <tablespace>/db_id
1065  */
1066 static void
1067 remove_dbtablespaces(Oid db_id)
1068 {
1069         Relation        rel;
1070         HeapScanDesc scan;
1071         HeapTuple       tuple;
1072
1073         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1074         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1075         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1076         {
1077                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1078                 char       *dstpath;
1079                 struct stat st;
1080
1081                 /* Don't mess with the global tablespace */
1082                 if (dsttablespace == GLOBALTABLESPACE_OID)
1083                         continue;
1084
1085                 dstpath = GetDatabasePath(db_id, dsttablespace);
1086
1087                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1088                 {
1089                         /* Assume we can ignore it */
1090                         pfree(dstpath);
1091                         continue;
1092                 }
1093
1094                 if (!rmtree(dstpath, true))
1095                         ereport(WARNING,
1096                                         (errmsg("could not remove database directory \"%s\"",
1097                                                         dstpath)));
1098
1099                 /* Record the filesystem change in XLOG */
1100                 {
1101                         xl_dbase_drop_rec xlrec;
1102                         XLogRecData rdata[1];
1103
1104                         xlrec.db_id = db_id;
1105                         xlrec.tablespace_id = dsttablespace;
1106
1107                         rdata[0].data = (char *) &xlrec;
1108                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1109                         rdata[0].buffer = InvalidBuffer;
1110                         rdata[0].next = NULL;
1111
1112                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1113                 }
1114
1115                 pfree(dstpath);
1116         }
1117
1118         heap_endscan(scan);
1119         heap_close(rel, AccessShareLock);
1120 }
1121
1122
1123 /*
1124  * get_database_oid - given a database name, look up the OID
1125  *
1126  * Returns InvalidOid if database name not found.
1127  *
1128  * This is not actually used in this file, but is exported for use elsewhere.
1129  */
1130 Oid
1131 get_database_oid(const char *dbname)
1132 {
1133         Relation        pg_database;
1134         ScanKeyData entry[1];
1135         SysScanDesc scan;
1136         HeapTuple       dbtuple;
1137         Oid                     oid;
1138
1139         /* There's no syscache for pg_database, so must look the hard way */
1140         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1141         ScanKeyInit(&entry[0],
1142                                 Anum_pg_database_datname,
1143                                 BTEqualStrategyNumber, F_NAMEEQ,
1144                                 CStringGetDatum(dbname));
1145         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1146                                                           SnapshotNow, 1, entry);
1147
1148         dbtuple = systable_getnext(scan);
1149
1150         /* We assume that there can be at most one matching tuple */
1151         if (HeapTupleIsValid(dbtuple))
1152                 oid = HeapTupleGetOid(dbtuple);
1153         else
1154                 oid = InvalidOid;
1155
1156         systable_endscan(scan);
1157         heap_close(pg_database, AccessShareLock);
1158
1159         return oid;
1160 }
1161
1162
1163 /*
1164  * get_database_name - given a database OID, look up the name
1165  *
1166  * Returns a palloc'd string, or NULL if no such database.
1167  *
1168  * This is not actually used in this file, but is exported for use elsewhere.
1169  */
1170 char *
1171 get_database_name(Oid dbid)
1172 {
1173         Relation        pg_database;
1174         ScanKeyData entry[1];
1175         SysScanDesc scan;
1176         HeapTuple       dbtuple;
1177         char       *result;
1178
1179         /* There's no syscache for pg_database, so must look the hard way */
1180         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1181         ScanKeyInit(&entry[0],
1182                                 ObjectIdAttributeNumber,
1183                                 BTEqualStrategyNumber, F_OIDEQ,
1184                                 ObjectIdGetDatum(dbid));
1185         scan = systable_beginscan(pg_database, DatabaseOidIndexId, true,
1186                                                           SnapshotNow, 1, entry);
1187
1188         dbtuple = systable_getnext(scan);
1189
1190         /* We assume that there can be at most one matching tuple */
1191         if (HeapTupleIsValid(dbtuple))
1192                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1193         else
1194                 result = NULL;
1195
1196         systable_endscan(scan);
1197         heap_close(pg_database, AccessShareLock);
1198
1199         return result;
1200 }
1201
1202 /*
1203  * DATABASE resource manager's routines
1204  */
1205 void
1206 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1207 {
1208         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1209
1210         if (info == XLOG_DBASE_CREATE)
1211         {
1212                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1213                 char       *src_path;
1214                 char       *dst_path;
1215                 struct stat st;
1216
1217 #ifndef WIN32
1218                 char            buf[2 * MAXPGPATH + 100];
1219 #endif
1220
1221                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1222                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1223
1224                 /*
1225                  * Our theory for replaying a CREATE is to forcibly drop the
1226                  * target subdirectory if present, then re-copy the source data.
1227                  * This may be more work than needed, but it is simple to
1228                  * implement.
1229                  */
1230                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1231                 {
1232                         if (!rmtree(dst_path, true))
1233                                 ereport(WARNING,
1234                                         (errmsg("could not remove database directory \"%s\"",
1235                                                         dst_path)));
1236                 }
1237
1238                 /*
1239                  * Force dirty buffers out to disk, to ensure source database is
1240                  * up-to-date for the copy.  (We really only need to flush buffers for
1241                  * the source database, but bufmgr.c provides no API for that.)
1242                  */
1243                 BufferSync();
1244
1245 #ifndef WIN32
1246
1247                 /*
1248                  * Copy this subdirectory to the new location
1249                  *
1250                  * XXX use of cp really makes this code pretty grotty, particularly
1251                  * with respect to lack of ability to report errors well.  Someday
1252                  * rewrite to do it for ourselves.
1253                  */
1254
1255                 /* We might need to use cp -R one day for portability */
1256                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1257                                  src_path, dst_path);
1258                 if (system(buf) != 0)
1259                         ereport(ERROR,
1260                                         (errmsg("could not initialize database directory"),
1261                                          errdetail("Failing system command was: %s", buf),
1262                                          errhint("Look in the postmaster's stderr log for more information.")));
1263 #else                                                   /* WIN32 */
1264                 if (copydir(src_path, dst_path) != 0)
1265                 {
1266                         /* copydir should already have given details of its troubles */
1267                         ereport(ERROR,
1268                                         (errmsg("could not initialize database directory")));
1269                 }
1270 #endif   /* WIN32 */
1271         }
1272         else if (info == XLOG_DBASE_DROP)
1273         {
1274                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1275                 char       *dst_path;
1276
1277                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1278
1279                 /*
1280                  * Drop pages for this database that are in the shared buffer
1281                  * cache
1282                  */
1283                 DropBuffers(xlrec->db_id);
1284
1285                 if (!rmtree(dst_path, true))
1286                         ereport(WARNING,
1287                                         (errmsg("could not remove database directory \"%s\"",
1288                                                         dst_path)));
1289         }
1290         else if (info == XLOG_DBASE_CREATE_OLD)
1291         {
1292                 xl_dbase_create_rec_old *xlrec = (xl_dbase_create_rec_old *) XLogRecGetData(record);
1293                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1294                 struct stat st;
1295
1296 #ifndef WIN32
1297                 char            buf[2 * MAXPGPATH + 100];
1298 #endif
1299
1300                 /*
1301                  * Our theory for replaying a CREATE is to forcibly drop the
1302                  * target subdirectory if present, then re-copy the source data.
1303                  * This may be more work than needed, but it is simple to
1304                  * implement.
1305                  */
1306                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1307                 {
1308                         if (!rmtree(dst_path, true))
1309                                 ereport(WARNING,
1310                                         (errmsg("could not remove database directory \"%s\"",
1311                                                         dst_path)));
1312                 }
1313
1314                 /*
1315                  * Force dirty buffers out to disk, to ensure source database is
1316                  * up-to-date for the copy.  (We really only need to flush buffers for
1317                  * the source database, but bufmgr.c provides no API for that.)
1318                  */
1319                 BufferSync();
1320
1321 #ifndef WIN32
1322
1323                 /*
1324                  * Copy this subdirectory to the new location
1325                  *
1326                  * XXX use of cp really makes this code pretty grotty, particularly
1327                  * with respect to lack of ability to report errors well.  Someday
1328                  * rewrite to do it for ourselves.
1329                  */
1330
1331                 /* We might need to use cp -R one day for portability */
1332                 snprintf(buf, sizeof(buf), "cp -r '%s' '%s'",
1333                                  xlrec->src_path, dst_path);
1334                 if (system(buf) != 0)
1335                         ereport(ERROR,
1336                                         (errmsg("could not initialize database directory"),
1337                                          errdetail("Failing system command was: %s", buf),
1338                                          errhint("Look in the postmaster's stderr log for more information.")));
1339 #else                                                   /* WIN32 */
1340                 if (copydir(xlrec->src_path, dst_path) != 0)
1341                 {
1342                         /* copydir should already have given details of its troubles */
1343                         ereport(ERROR,
1344                                         (errmsg("could not initialize database directory")));
1345                 }
1346 #endif   /* WIN32 */
1347         }
1348         else if (info == XLOG_DBASE_DROP_OLD)
1349         {
1350                 xl_dbase_drop_rec_old *xlrec = (xl_dbase_drop_rec_old *) XLogRecGetData(record);
1351
1352                 /*
1353                  * Drop pages for this database that are in the shared buffer
1354                  * cache
1355                  */
1356                 DropBuffers(xlrec->db_id);
1357
1358                 if (!rmtree(xlrec->dir_path, true))
1359                         ereport(WARNING,
1360                                         (errmsg("could not remove database directory \"%s\"",
1361                                                         xlrec->dir_path)));
1362         }
1363         else
1364                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1365 }
1366
1367 void
1368 dbase_desc(char *buf, uint8 xl_info, char *rec)
1369 {
1370         uint8           info = xl_info & ~XLR_INFO_MASK;
1371
1372         if (info == XLOG_DBASE_CREATE)
1373         {
1374                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1375
1376                 sprintf(buf + strlen(buf), "create db: copy dir %u/%u to %u/%u",
1377                                 xlrec->src_db_id, xlrec->src_tablespace_id,
1378                                 xlrec->db_id, xlrec->tablespace_id);
1379         }
1380         else if (info == XLOG_DBASE_DROP)
1381         {
1382                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1383
1384                 sprintf(buf + strlen(buf), "drop db: dir %u/%u",
1385                                 xlrec->db_id, xlrec->tablespace_id);
1386         }
1387         else if (info == XLOG_DBASE_CREATE_OLD)
1388         {
1389                 xl_dbase_create_rec_old *xlrec = (xl_dbase_create_rec_old *) rec;
1390                 char       *dst_path = xlrec->src_path + strlen(xlrec->src_path) + 1;
1391
1392                 sprintf(buf + strlen(buf), "create db: %u copy \"%s\" to \"%s\"",
1393                                 xlrec->db_id, xlrec->src_path, dst_path);
1394         }
1395         else if (info == XLOG_DBASE_DROP_OLD)
1396         {
1397                 xl_dbase_drop_rec_old *xlrec = (xl_dbase_drop_rec_old *) rec;
1398
1399                 sprintf(buf + strlen(buf), "drop db: %u directory: \"%s\"",
1400                                 xlrec->db_id, xlrec->dir_path);
1401         }
1402         else
1403                 strcat(buf, "UNKNOWN");
1404 }