]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Stamp copyrights for year 2011.
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes.  Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        src/backend/commands/dbcommands.c
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <locale.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
26
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/transam.h"
30 #include "access/xact.h"
31 #include "access/xlogutils.h"
32 #include "catalog/catalog.h"
33 #include "catalog/dependency.h"
34 #include "catalog/indexing.h"
35 #include "catalog/objectaccess.h"
36 #include "catalog/pg_authid.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_db_role_setting.h"
39 #include "catalog/pg_tablespace.h"
40 #include "commands/comment.h"
41 #include "commands/dbcommands.h"
42 #include "commands/tablespace.h"
43 #include "mb/pg_wchar.h"
44 #include "miscadmin.h"
45 #include "pgstat.h"
46 #include "postmaster/bgwriter.h"
47 #include "storage/bufmgr.h"
48 #include "storage/copydir.h"
49 #include "storage/fd.h"
50 #include "storage/lmgr.h"
51 #include "storage/ipc.h"
52 #include "storage/procarray.h"
53 #include "storage/smgr.h"
54 #include "storage/standby.h"
55 #include "utils/acl.h"
56 #include "utils/builtins.h"
57 #include "utils/fmgroids.h"
58 #include "utils/lsyscache.h"
59 #include "utils/pg_locale.h"
60 #include "utils/snapmgr.h"
61 #include "utils/syscache.h"
62 #include "utils/tqual.h"
63
64
65 typedef struct
66 {
67         Oid                     src_dboid;              /* source (template) DB */
68         Oid                     dest_dboid;             /* DB we are trying to create */
69 } createdb_failure_params;
70
71 typedef struct
72 {
73         Oid                     dest_dboid;             /* DB we are trying to move */
74         Oid                     dest_tsoid;             /* tablespace we are trying to move to */
75 } movedb_failure_params;
76
77 /* non-export function prototypes */
78 static void createdb_failure_callback(int code, Datum arg);
79 static void movedb(const char *dbname, const char *tblspcname);
80 static void movedb_failure_callback(int code, Datum arg);
81 static bool get_db_info(const char *name, LOCKMODE lockmode,
82                         Oid *dbIdP, Oid *ownerIdP,
83                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
84                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
85                         Oid *dbTablespace, char **dbCollate, char **dbCtype);
86 static bool have_createdb_privilege(void);
87 static void remove_dbtablespaces(Oid db_id);
88 static bool check_db_file_conflict(Oid db_id);
89 static int      errdetail_busy_db(int notherbackends, int npreparedxacts);
90
91
92 /*
93  * CREATE DATABASE
94  */
95 void
96 createdb(const CreatedbStmt *stmt)
97 {
98         HeapScanDesc scan;
99         Relation        rel;
100         Oid                     src_dboid;
101         Oid                     src_owner;
102         int                     src_encoding;
103         char       *src_collate;
104         char       *src_ctype;
105         bool            src_istemplate;
106         bool            src_allowconn;
107         Oid                     src_lastsysoid;
108         TransactionId src_frozenxid;
109         Oid                     src_deftablespace;
110         volatile Oid dst_deftablespace;
111         Relation        pg_database_rel;
112         HeapTuple       tuple;
113         Datum           new_record[Natts_pg_database];
114         bool            new_record_nulls[Natts_pg_database];
115         Oid                     dboid;
116         Oid                     datdba;
117         ListCell   *option;
118         DefElem    *dtablespacename = NULL;
119         DefElem    *downer = NULL;
120         DefElem    *dtemplate = NULL;
121         DefElem    *dencoding = NULL;
122         DefElem    *dcollate = NULL;
123         DefElem    *dctype = NULL;
124         DefElem    *dconnlimit = NULL;
125         char       *dbname = stmt->dbname;
126         char       *dbowner = NULL;
127         const char *dbtemplate = NULL;
128         char       *dbcollate = NULL;
129         char       *dbctype = NULL;
130         int                     encoding = -1;
131         int                     dbconnlimit = -1;
132         int                     ctype_encoding;
133         int                     collate_encoding;
134         int                     notherbackends;
135         int                     npreparedxacts;
136         createdb_failure_params fparms;
137
138         /* Extract options from the statement node tree */
139         foreach(option, stmt->options)
140         {
141                 DefElem    *defel = (DefElem *) lfirst(option);
142
143                 if (strcmp(defel->defname, "tablespace") == 0)
144                 {
145                         if (dtablespacename)
146                                 ereport(ERROR,
147                                                 (errcode(ERRCODE_SYNTAX_ERROR),
148                                                  errmsg("conflicting or redundant options")));
149                         dtablespacename = defel;
150                 }
151                 else if (strcmp(defel->defname, "owner") == 0)
152                 {
153                         if (downer)
154                                 ereport(ERROR,
155                                                 (errcode(ERRCODE_SYNTAX_ERROR),
156                                                  errmsg("conflicting or redundant options")));
157                         downer = defel;
158                 }
159                 else if (strcmp(defel->defname, "template") == 0)
160                 {
161                         if (dtemplate)
162                                 ereport(ERROR,
163                                                 (errcode(ERRCODE_SYNTAX_ERROR),
164                                                  errmsg("conflicting or redundant options")));
165                         dtemplate = defel;
166                 }
167                 else if (strcmp(defel->defname, "encoding") == 0)
168                 {
169                         if (dencoding)
170                                 ereport(ERROR,
171                                                 (errcode(ERRCODE_SYNTAX_ERROR),
172                                                  errmsg("conflicting or redundant options")));
173                         dencoding = defel;
174                 }
175                 else if (strcmp(defel->defname, "lc_collate") == 0)
176                 {
177                         if (dcollate)
178                                 ereport(ERROR,
179                                                 (errcode(ERRCODE_SYNTAX_ERROR),
180                                                  errmsg("conflicting or redundant options")));
181                         dcollate = defel;
182                 }
183                 else if (strcmp(defel->defname, "lc_ctype") == 0)
184                 {
185                         if (dctype)
186                                 ereport(ERROR,
187                                                 (errcode(ERRCODE_SYNTAX_ERROR),
188                                                  errmsg("conflicting or redundant options")));
189                         dctype = defel;
190                 }
191                 else if (strcmp(defel->defname, "connectionlimit") == 0)
192                 {
193                         if (dconnlimit)
194                                 ereport(ERROR,
195                                                 (errcode(ERRCODE_SYNTAX_ERROR),
196                                                  errmsg("conflicting or redundant options")));
197                         dconnlimit = defel;
198                 }
199                 else if (strcmp(defel->defname, "location") == 0)
200                 {
201                         ereport(WARNING,
202                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
203                                          errmsg("LOCATION is not supported anymore"),
204                                          errhint("Consider using tablespaces instead.")));
205                 }
206                 else
207                         elog(ERROR, "option \"%s\" not recognized",
208                                  defel->defname);
209         }
210
211         if (downer && downer->arg)
212                 dbowner = strVal(downer->arg);
213         if (dtemplate && dtemplate->arg)
214                 dbtemplate = strVal(dtemplate->arg);
215         if (dencoding && dencoding->arg)
216         {
217                 const char *encoding_name;
218
219                 if (IsA(dencoding->arg, Integer))
220                 {
221                         encoding = intVal(dencoding->arg);
222                         encoding_name = pg_encoding_to_char(encoding);
223                         if (strcmp(encoding_name, "") == 0 ||
224                                 pg_valid_server_encoding(encoding_name) < 0)
225                                 ereport(ERROR,
226                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
227                                                  errmsg("%d is not a valid encoding code",
228                                                                 encoding)));
229                 }
230                 else if (IsA(dencoding->arg, String))
231                 {
232                         encoding_name = strVal(dencoding->arg);
233                         encoding = pg_valid_server_encoding(encoding_name);
234                         if (encoding < 0)
235                                 ereport(ERROR,
236                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
237                                                  errmsg("%s is not a valid encoding name",
238                                                                 encoding_name)));
239                 }
240                 else
241                         elog(ERROR, "unrecognized node type: %d",
242                                  nodeTag(dencoding->arg));
243         }
244         if (dcollate && dcollate->arg)
245                 dbcollate = strVal(dcollate->arg);
246         if (dctype && dctype->arg)
247                 dbctype = strVal(dctype->arg);
248
249         if (dconnlimit && dconnlimit->arg)
250         {
251                 dbconnlimit = intVal(dconnlimit->arg);
252                 if (dbconnlimit < -1)
253                         ereport(ERROR,
254                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
255                                          errmsg("invalid connection limit: %d", dbconnlimit)));
256         }
257
258         /* obtain OID of proposed owner */
259         if (dbowner)
260                 datdba = get_role_oid(dbowner, false);
261         else
262                 datdba = GetUserId();
263
264         /*
265          * To create a database, must have createdb privilege and must be able to
266          * become the target role (this does not imply that the target role itself
267          * must have createdb privilege).  The latter provision guards against
268          * "giveaway" attacks.  Note that a superuser will always have both of
269          * these privileges a fortiori.
270          */
271         if (!have_createdb_privilege())
272                 ereport(ERROR,
273                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
274                                  errmsg("permission denied to create database")));
275
276         check_is_member_of_role(GetUserId(), datdba);
277
278         /*
279          * Lookup database (template) to be cloned, and obtain share lock on it.
280          * ShareLock allows two CREATE DATABASEs to work from the same template
281          * concurrently, while ensuring no one is busy dropping it in parallel
282          * (which would be Very Bad since we'd likely get an incomplete copy
283          * without knowing it).  This also prevents any new connections from being
284          * made to the source until we finish copying it, so we can be sure it
285          * won't change underneath us.
286          */
287         if (!dbtemplate)
288                 dbtemplate = "template1";               /* Default template database name */
289
290         if (!get_db_info(dbtemplate, ShareLock,
291                                          &src_dboid, &src_owner, &src_encoding,
292                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
293                                          &src_frozenxid, &src_deftablespace,
294                                          &src_collate, &src_ctype))
295                 ereport(ERROR,
296                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
297                                  errmsg("template database \"%s\" does not exist",
298                                                 dbtemplate)));
299
300         /*
301          * Permission check: to copy a DB that's not marked datistemplate, you
302          * must be superuser or the owner thereof.
303          */
304         if (!src_istemplate)
305         {
306                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
307                         ereport(ERROR,
308                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
309                                          errmsg("permission denied to copy database \"%s\"",
310                                                         dbtemplate)));
311         }
312
313         /* If encoding or locales are defaulted, use source's setting */
314         if (encoding < 0)
315                 encoding = src_encoding;
316         if (dbcollate == NULL)
317                 dbcollate = src_collate;
318         if (dbctype == NULL)
319                 dbctype = src_ctype;
320
321         /* Some encodings are client only */
322         if (!PG_VALID_BE_ENCODING(encoding))
323                 ereport(ERROR,
324                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
325                                  errmsg("invalid server encoding %d", encoding)));
326
327         /* Check that the chosen locales are valid */
328         if (!check_locale(LC_COLLATE, dbcollate))
329                 ereport(ERROR,
330                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
331                                  errmsg("invalid locale name %s", dbcollate)));
332         if (!check_locale(LC_CTYPE, dbctype))
333                 ereport(ERROR,
334                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
335                                  errmsg("invalid locale name %s", dbctype)));
336
337         /*
338          * Check whether chosen encoding matches chosen locale settings.  This
339          * restriction is necessary because libc's locale-specific code usually
340          * fails when presented with data in an encoding it's not expecting. We
341          * allow mismatch in four cases:
342          *
343          * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
344          * which works with any encoding.
345          *
346          * 2. locale encoding = -1, which means that we couldn't determine the
347          * locale's encoding and have to trust the user to get it right.
348          *
349          * 3. selected encoding is UTF8 and platform is win32. This is because
350          * UTF8 is a pseudo codepage that is supported in all locales since it's
351          * converted to UTF16 before being used.
352          *
353          * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
354          * is risky but we have historically allowed it --- notably, the
355          * regression tests require it.
356          *
357          * Note: if you change this policy, fix initdb to match.
358          */
359         ctype_encoding = pg_get_encoding_from_locale(dbctype);
360         collate_encoding = pg_get_encoding_from_locale(dbcollate);
361
362         if (!(ctype_encoding == encoding ||
363                   ctype_encoding == PG_SQL_ASCII ||
364                   ctype_encoding == -1 ||
365 #ifdef WIN32
366                   encoding == PG_UTF8 ||
367 #endif
368                   (encoding == PG_SQL_ASCII && superuser())))
369                 ereport(ERROR,
370                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
371                                  errmsg("encoding %s does not match locale %s",
372                                                 pg_encoding_to_char(encoding),
373                                                 dbctype),
374                            errdetail("The chosen LC_CTYPE setting requires encoding %s.",
375                                                  pg_encoding_to_char(ctype_encoding))));
376
377         if (!(collate_encoding == encoding ||
378                   collate_encoding == PG_SQL_ASCII ||
379                   collate_encoding == -1 ||
380 #ifdef WIN32
381                   encoding == PG_UTF8 ||
382 #endif
383                   (encoding == PG_SQL_ASCII && superuser())))
384                 ereport(ERROR,
385                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
386                                  errmsg("encoding %s does not match locale %s",
387                                                 pg_encoding_to_char(encoding),
388                                                 dbcollate),
389                          errdetail("The chosen LC_COLLATE setting requires encoding %s.",
390                                            pg_encoding_to_char(collate_encoding))));
391
392         /*
393          * Check that the new encoding and locale settings match the source
394          * database.  We insist on this because we simply copy the source data ---
395          * any non-ASCII data would be wrongly encoded, and any indexes sorted
396          * according to the source locale would be wrong.
397          *
398          * However, we assume that template0 doesn't contain any non-ASCII data
399          * nor any indexes that depend on collation or ctype, so template0 can be
400          * used as template for creating a database with any encoding or locale.
401          */
402         if (strcmp(dbtemplate, "template0") != 0)
403         {
404                 if (encoding != src_encoding)
405                         ereport(ERROR,
406                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
407                                          errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
408                                                         pg_encoding_to_char(encoding),
409                                                         pg_encoding_to_char(src_encoding)),
410                                          errhint("Use the same encoding as in the template database, or use template0 as template.")));
411
412                 if (strcmp(dbcollate, src_collate) != 0)
413                         ereport(ERROR,
414                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
415                                          errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
416                                                         dbcollate, src_collate),
417                                          errhint("Use the same collation as in the template database, or use template0 as template.")));
418
419                 if (strcmp(dbctype, src_ctype) != 0)
420                         ereport(ERROR,
421                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
422                                          errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
423                                                         dbctype, src_ctype),
424                                          errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
425         }
426
427         /* Resolve default tablespace for new database */
428         if (dtablespacename && dtablespacename->arg)
429         {
430                 char       *tablespacename;
431                 AclResult       aclresult;
432
433                 tablespacename = strVal(dtablespacename->arg);
434                 dst_deftablespace = get_tablespace_oid(tablespacename, false);
435                 /* check permissions */
436                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
437                                                                                    ACL_CREATE);
438                 if (aclresult != ACLCHECK_OK)
439                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
440                                                    tablespacename);
441
442                 /* pg_global must never be the default tablespace */
443                 if (dst_deftablespace == GLOBALTABLESPACE_OID)
444                         ereport(ERROR,
445                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
446                                   errmsg("pg_global cannot be used as default tablespace")));
447
448                 /*
449                  * If we are trying to change the default tablespace of the template,
450                  * we require that the template not have any files in the new default
451                  * tablespace.  This is necessary because otherwise the copied
452                  * database would contain pg_class rows that refer to its default
453                  * tablespace both explicitly (by OID) and implicitly (as zero), which
454                  * would cause problems.  For example another CREATE DATABASE using
455                  * the copied database as template, and trying to change its default
456                  * tablespace again, would yield outright incorrect results (it would
457                  * improperly move tables to the new default tablespace that should
458                  * stay in the same tablespace).
459                  */
460                 if (dst_deftablespace != src_deftablespace)
461                 {
462                         char       *srcpath;
463                         struct stat st;
464
465                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
466
467                         if (stat(srcpath, &st) == 0 &&
468                                 S_ISDIR(st.st_mode) &&
469                                 !directory_is_empty(srcpath))
470                                 ereport(ERROR,
471                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
472                                                  errmsg("cannot assign new default tablespace \"%s\"",
473                                                                 tablespacename),
474                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
475                                                                    dbtemplate)));
476                         pfree(srcpath);
477                 }
478         }
479         else
480         {
481                 /* Use template database's default tablespace */
482                 dst_deftablespace = src_deftablespace;
483                 /* Note there is no additional permission check in this path */
484         }
485
486         /*
487          * Check for db name conflict.  This is just to give a more friendly error
488          * message than "unique index violation".  There's a race condition but
489          * we're willing to accept the less friendly message in that case.
490          */
491         if (OidIsValid(get_database_oid(dbname, true)))
492                 ereport(ERROR,
493                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
494                                  errmsg("database \"%s\" already exists", dbname)));
495
496         /*
497          * The source DB can't have any active backends, except this one
498          * (exception is to allow CREATE DB while connected to template1).
499          * Otherwise we might copy inconsistent data.
500          *
501          * This should be last among the basic error checks, because it involves
502          * potential waiting; we may as well throw an error first if we're gonna
503          * throw one.
504          */
505         if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
506                 ereport(ERROR,
507                                 (errcode(ERRCODE_OBJECT_IN_USE),
508                         errmsg("source database \"%s\" is being accessed by other users",
509                                    dbtemplate),
510                                  errdetail_busy_db(notherbackends, npreparedxacts)));
511
512         /*
513          * Select an OID for the new database, checking that it doesn't have a
514          * filename conflict with anything already existing in the tablespace
515          * directories.
516          */
517         pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
518
519         do
520         {
521                 dboid = GetNewOid(pg_database_rel);
522         } while (check_db_file_conflict(dboid));
523
524         /*
525          * Insert a new tuple into pg_database.  This establishes our ownership of
526          * the new database name (anyone else trying to insert the same name will
527          * block on the unique index, and fail after we commit).
528          */
529
530         /* Form tuple */
531         MemSet(new_record, 0, sizeof(new_record));
532         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
533
534         new_record[Anum_pg_database_datname - 1] =
535                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
536         new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
537         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
538         new_record[Anum_pg_database_datcollate - 1] =
539                 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
540         new_record[Anum_pg_database_datctype - 1] =
541                 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
542         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
543         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
544         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
545         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
546         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
547         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
548
549         /*
550          * We deliberately set datacl to default (NULL), rather than copying it
551          * from the template database.  Copying it would be a bad idea when the
552          * owner is not the same as the template's owner.
553          */
554         new_record_nulls[Anum_pg_database_datacl - 1] = true;
555
556         tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
557                                                         new_record, new_record_nulls);
558
559         HeapTupleSetOid(tuple, dboid);
560
561         simple_heap_insert(pg_database_rel, tuple);
562
563         /* Update indexes */
564         CatalogUpdateIndexes(pg_database_rel, tuple);
565
566         /*
567          * Now generate additional catalog entries associated with the new DB
568          */
569
570         /* Register owner dependency */
571         recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
572
573         /* Create pg_shdepend entries for objects within database */
574         copyTemplateDependencies(src_dboid, dboid);
575
576         /* Post creation hook for new database */
577         InvokeObjectAccessHook(OAT_POST_CREATE, DatabaseRelationId, dboid, 0);
578
579         /*
580          * Force a checkpoint before starting the copy. This will force dirty
581          * buffers out to disk, to ensure source database is up-to-date on disk
582          * for the copy. FlushDatabaseBuffers() would suffice for that, but we
583          * also want to process any pending unlink requests. Otherwise, if a
584          * checkpoint happened while we're copying files, a file might be deleted
585          * just when we're about to copy it, causing the lstat() call in copydir()
586          * to fail with ENOENT.
587          */
588         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
589
590         /*
591          * Once we start copying subdirectories, we need to be able to clean 'em
592          * up if we fail.  Use an ENSURE block to make sure this happens.  (This
593          * is not a 100% solution, because of the possibility of failure during
594          * transaction commit after we leave this routine, but it should handle
595          * most scenarios.)
596          */
597         fparms.src_dboid = src_dboid;
598         fparms.dest_dboid = dboid;
599         PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
600                                                         PointerGetDatum(&fparms));
601         {
602                 /*
603                  * Iterate through all tablespaces of the template database, and copy
604                  * each one to the new database.
605                  */
606                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
607                 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
608                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
609                 {
610                         Oid                     srctablespace = HeapTupleGetOid(tuple);
611                         Oid                     dsttablespace;
612                         char       *srcpath;
613                         char       *dstpath;
614                         struct stat st;
615
616                         /* No need to copy global tablespace */
617                         if (srctablespace == GLOBALTABLESPACE_OID)
618                                 continue;
619
620                         srcpath = GetDatabasePath(src_dboid, srctablespace);
621
622                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
623                                 directory_is_empty(srcpath))
624                         {
625                                 /* Assume we can ignore it */
626                                 pfree(srcpath);
627                                 continue;
628                         }
629
630                         if (srctablespace == src_deftablespace)
631                                 dsttablespace = dst_deftablespace;
632                         else
633                                 dsttablespace = srctablespace;
634
635                         dstpath = GetDatabasePath(dboid, dsttablespace);
636
637                         /*
638                          * Copy this subdirectory to the new location
639                          *
640                          * We don't need to copy subdirectories
641                          */
642                         copydir(srcpath, dstpath, false);
643
644                         /* Record the filesystem change in XLOG */
645                         {
646                                 xl_dbase_create_rec xlrec;
647                                 XLogRecData rdata[1];
648
649                                 xlrec.db_id = dboid;
650                                 xlrec.tablespace_id = dsttablespace;
651                                 xlrec.src_db_id = src_dboid;
652                                 xlrec.src_tablespace_id = srctablespace;
653
654                                 rdata[0].data = (char *) &xlrec;
655                                 rdata[0].len = sizeof(xl_dbase_create_rec);
656                                 rdata[0].buffer = InvalidBuffer;
657                                 rdata[0].next = NULL;
658
659                                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
660                         }
661                 }
662                 heap_endscan(scan);
663                 heap_close(rel, AccessShareLock);
664
665                 /*
666                  * We force a checkpoint before committing.  This effectively means
667                  * that committed XLOG_DBASE_CREATE operations will never need to be
668                  * replayed (at least not in ordinary crash recovery; we still have to
669                  * make the XLOG entry for the benefit of PITR operations). This
670                  * avoids two nasty scenarios:
671                  *
672                  * #1: When PITR is off, we don't XLOG the contents of newly created
673                  * indexes; therefore the drop-and-recreate-whole-directory behavior
674                  * of DBASE_CREATE replay would lose such indexes.
675                  *
676                  * #2: Since we have to recopy the source database during DBASE_CREATE
677                  * replay, we run the risk of copying changes in it that were
678                  * committed after the original CREATE DATABASE command but before the
679                  * system crash that led to the replay.  This is at least unexpected
680                  * and at worst could lead to inconsistencies, eg duplicate table
681                  * names.
682                  *
683                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
684                  *
685                  * In PITR replay, the first of these isn't an issue, and the second
686                  * is only a risk if the CREATE DATABASE and subsequent template
687                  * database change both occur while a base backup is being taken.
688                  * There doesn't seem to be much we can do about that except document
689                  * it as a limitation.
690                  *
691                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
692                  * we can avoid this.
693                  */
694                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
695
696                 /*
697                  * Close pg_database, but keep lock till commit.
698                  */
699                 heap_close(pg_database_rel, NoLock);
700
701                 /*
702                  * Force synchronous commit, thus minimizing the window between
703                  * creation of the database files and commital of the transaction. If
704                  * we crash before committing, we'll have a DB that's taking up disk
705                  * space but is not in pg_database, which is not good.
706                  */
707                 ForceSyncCommit();
708         }
709         PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
710                                                                 PointerGetDatum(&fparms));
711 }
712
713 /* Error cleanup callback for createdb */
714 static void
715 createdb_failure_callback(int code, Datum arg)
716 {
717         createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
718
719         /*
720          * Release lock on source database before doing recursive remove. This is
721          * not essential but it seems desirable to release the lock as soon as
722          * possible.
723          */
724         UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
725
726         /* Throw away any successfully copied subdirectories */
727         remove_dbtablespaces(fparms->dest_dboid);
728 }
729
730
731 /*
732  * DROP DATABASE
733  */
734 void
735 dropdb(const char *dbname, bool missing_ok)
736 {
737         Oid                     db_id;
738         bool            db_istemplate;
739         Relation        pgdbrel;
740         HeapTuple       tup;
741         int                     notherbackends;
742         int                     npreparedxacts;
743
744         /*
745          * Look up the target database's OID, and get exclusive lock on it. We
746          * need this to ensure that no new backend starts up in the target
747          * database while we are deleting it (see postinit.c), and that no one is
748          * using it as a CREATE DATABASE template or trying to delete it for
749          * themselves.
750          */
751         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
752
753         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
754                                          &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL))
755         {
756                 if (!missing_ok)
757                 {
758                         ereport(ERROR,
759                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
760                                          errmsg("database \"%s\" does not exist", dbname)));
761                 }
762                 else
763                 {
764                         /* Close pg_database, release the lock, since we changed nothing */
765                         heap_close(pgdbrel, RowExclusiveLock);
766                         ereport(NOTICE,
767                                         (errmsg("database \"%s\" does not exist, skipping",
768                                                         dbname)));
769                         return;
770                 }
771         }
772
773         /*
774          * Permission checks
775          */
776         if (!pg_database_ownercheck(db_id, GetUserId()))
777                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
778                                            dbname);
779
780         /*
781          * Disallow dropping a DB that is marked istemplate.  This is just to
782          * prevent people from accidentally dropping template0 or template1; they
783          * can do so if they're really determined ...
784          */
785         if (db_istemplate)
786                 ereport(ERROR,
787                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
788                                  errmsg("cannot drop a template database")));
789
790         /* Obviously can't drop my own database */
791         if (db_id == MyDatabaseId)
792                 ereport(ERROR,
793                                 (errcode(ERRCODE_OBJECT_IN_USE),
794                                  errmsg("cannot drop the currently open database")));
795
796         /*
797          * Check for other backends in the target database.  (Because we hold the
798          * database lock, no new ones can start after this.)
799          *
800          * As in CREATE DATABASE, check this after other error conditions.
801          */
802         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
803                 ereport(ERROR,
804                                 (errcode(ERRCODE_OBJECT_IN_USE),
805                                  errmsg("database \"%s\" is being accessed by other users",
806                                                 dbname),
807                                  errdetail_busy_db(notherbackends, npreparedxacts)));
808
809         /*
810          * Remove the database's tuple from pg_database.
811          */
812         tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
813         if (!HeapTupleIsValid(tup))
814                 elog(ERROR, "cache lookup failed for database %u", db_id);
815
816         simple_heap_delete(pgdbrel, &tup->t_self);
817
818         ReleaseSysCache(tup);
819
820         /*
821          * Delete any comments associated with the database.
822          */
823         DeleteSharedComments(db_id, DatabaseRelationId);
824
825         /*
826          * Remove settings associated with this database
827          */
828         DropSetting(db_id, InvalidOid);
829
830         /*
831          * Remove shared dependency references for the database.
832          */
833         dropDatabaseDependencies(db_id);
834
835         /*
836          * Drop pages for this database that are in the shared buffer cache. This
837          * is important to ensure that no remaining backend tries to write out a
838          * dirty buffer to the dead database later...
839          */
840         DropDatabaseBuffers(db_id);
841
842         /*
843          * Tell the stats collector to forget it immediately, too.
844          */
845         pgstat_drop_database(db_id);
846
847         /*
848          * Tell bgwriter to forget any pending fsync and unlink requests for files
849          * in the database; else the fsyncs will fail at next checkpoint, or
850          * worse, it will delete files that belong to a newly created database
851          * with the same OID.
852          */
853         ForgetDatabaseFsyncRequests(db_id);
854
855         /*
856          * Force a checkpoint to make sure the bgwriter has received the message
857          * sent by ForgetDatabaseFsyncRequests. On Windows, this also ensures that
858          * the bgwriter doesn't hold any open files, which would cause rmdir() to
859          * fail.
860          */
861         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
862
863         /*
864          * Remove all tablespace subdirs belonging to the database.
865          */
866         remove_dbtablespaces(db_id);
867
868         /*
869          * Close pg_database, but keep lock till commit.
870          */
871         heap_close(pgdbrel, NoLock);
872
873         /*
874          * Force synchronous commit, thus minimizing the window between removal of
875          * the database files and commital of the transaction. If we crash before
876          * committing, we'll have a DB that's gone on disk but still there
877          * according to pg_database, which is not good.
878          */
879         ForceSyncCommit();
880 }
881
882
883 /*
884  * Rename database
885  */
886 void
887 RenameDatabase(const char *oldname, const char *newname)
888 {
889         Oid                     db_id;
890         HeapTuple       newtup;
891         Relation        rel;
892         int                     notherbackends;
893         int                     npreparedxacts;
894
895         /*
896          * Look up the target database's OID, and get exclusive lock on it. We
897          * need this for the same reasons as DROP DATABASE.
898          */
899         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
900
901         if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
902                                          NULL, NULL, NULL, NULL, NULL, NULL, NULL))
903                 ereport(ERROR,
904                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
905                                  errmsg("database \"%s\" does not exist", oldname)));
906
907         /* must be owner */
908         if (!pg_database_ownercheck(db_id, GetUserId()))
909                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
910                                            oldname);
911
912         /* must have createdb rights */
913         if (!have_createdb_privilege())
914                 ereport(ERROR,
915                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
916                                  errmsg("permission denied to rename database")));
917
918         /*
919          * Make sure the new name doesn't exist.  See notes for same error in
920          * CREATE DATABASE.
921          */
922         if (OidIsValid(get_database_oid(newname, true)))
923                 ereport(ERROR,
924                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
925                                  errmsg("database \"%s\" already exists", newname)));
926
927         /*
928          * XXX Client applications probably store the current database somewhere,
929          * so renaming it could cause confusion.  On the other hand, there may not
930          * be an actual problem besides a little confusion, so think about this
931          * and decide.
932          */
933         if (db_id == MyDatabaseId)
934                 ereport(ERROR,
935                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
936                                  errmsg("current database cannot be renamed")));
937
938         /*
939          * Make sure the database does not have active sessions.  This is the same
940          * concern as above, but applied to other sessions.
941          *
942          * As in CREATE DATABASE, check this after other error conditions.
943          */
944         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
945                 ereport(ERROR,
946                                 (errcode(ERRCODE_OBJECT_IN_USE),
947                                  errmsg("database \"%s\" is being accessed by other users",
948                                                 oldname),
949                                  errdetail_busy_db(notherbackends, npreparedxacts)));
950
951         /* rename */
952         newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
953         if (!HeapTupleIsValid(newtup))
954                 elog(ERROR, "cache lookup failed for database %u", db_id);
955         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
956         simple_heap_update(rel, &newtup->t_self, newtup);
957         CatalogUpdateIndexes(rel, newtup);
958
959         /*
960          * Close pg_database, but keep lock till commit.
961          */
962         heap_close(rel, NoLock);
963 }
964
965
966 /*
967  * ALTER DATABASE SET TABLESPACE
968  */
969 static void
970 movedb(const char *dbname, const char *tblspcname)
971 {
972         Oid                     db_id;
973         Relation        pgdbrel;
974         int                     notherbackends;
975         int                     npreparedxacts;
976         HeapTuple       oldtuple,
977                                 newtuple;
978         Oid                     src_tblspcoid,
979                                 dst_tblspcoid;
980         Datum           new_record[Natts_pg_database];
981         bool            new_record_nulls[Natts_pg_database];
982         bool            new_record_repl[Natts_pg_database];
983         ScanKeyData scankey;
984         SysScanDesc sysscan;
985         AclResult       aclresult;
986         char       *src_dbpath;
987         char       *dst_dbpath;
988         DIR                *dstdir;
989         struct dirent *xlde;
990         movedb_failure_params fparms;
991
992         /*
993          * Look up the target database's OID, and get exclusive lock on it. We
994          * need this to ensure that no new backend starts up in the database while
995          * we are moving it, and that no one is using it as a CREATE DATABASE
996          * template or trying to delete it.
997          */
998         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
999
1000         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1001                                          NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1002                 ereport(ERROR,
1003                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1004                                  errmsg("database \"%s\" does not exist", dbname)));
1005
1006         /*
1007          * We actually need a session lock, so that the lock will persist across
1008          * the commit/restart below.  (We could almost get away with letting the
1009          * lock be released at commit, except that someone could try to move
1010          * relations of the DB back into the old directory while we rmtree() it.)
1011          */
1012         LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1013                                                            AccessExclusiveLock);
1014
1015         /*
1016          * Permission checks
1017          */
1018         if (!pg_database_ownercheck(db_id, GetUserId()))
1019                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1020                                            dbname);
1021
1022         /*
1023          * Obviously can't move the tables of my own database
1024          */
1025         if (db_id == MyDatabaseId)
1026                 ereport(ERROR,
1027                                 (errcode(ERRCODE_OBJECT_IN_USE),
1028                                  errmsg("cannot change the tablespace of the currently open database")));
1029
1030         /*
1031          * Get tablespace's oid
1032          */
1033         dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1034
1035         /*
1036          * Permission checks
1037          */
1038         aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1039                                                                            ACL_CREATE);
1040         if (aclresult != ACLCHECK_OK)
1041                 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
1042                                            tblspcname);
1043
1044         /*
1045          * pg_global must never be the default tablespace
1046          */
1047         if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1048                 ereport(ERROR,
1049                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1050                                  errmsg("pg_global cannot be used as default tablespace")));
1051
1052         /*
1053          * No-op if same tablespace
1054          */
1055         if (src_tblspcoid == dst_tblspcoid)
1056         {
1057                 heap_close(pgdbrel, NoLock);
1058                 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1059                                                                          AccessExclusiveLock);
1060                 return;
1061         }
1062
1063         /*
1064          * Check for other backends in the target database.  (Because we hold the
1065          * database lock, no new ones can start after this.)
1066          *
1067          * As in CREATE DATABASE, check this after other error conditions.
1068          */
1069         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1070                 ereport(ERROR,
1071                                 (errcode(ERRCODE_OBJECT_IN_USE),
1072                                  errmsg("database \"%s\" is being accessed by other users",
1073                                                 dbname),
1074                                  errdetail_busy_db(notherbackends, npreparedxacts)));
1075
1076         /*
1077          * Get old and new database paths
1078          */
1079         src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1080         dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1081
1082         /*
1083          * Force a checkpoint before proceeding. This will force dirty buffers out
1084          * to disk, to ensure source database is up-to-date on disk for the copy.
1085          * FlushDatabaseBuffers() would suffice for that, but we also want to
1086          * process any pending unlink requests. Otherwise, the check for existing
1087          * files in the target directory might fail unnecessarily, not to mention
1088          * that the copy might fail due to source files getting deleted under it.
1089          * On Windows, this also ensures that the bgwriter doesn't hold any open
1090          * files, which would cause rmdir() to fail.
1091          */
1092         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1093
1094         /*
1095          * Check for existence of files in the target directory, i.e., objects of
1096          * this database that are already in the target tablespace.  We can't
1097          * allow the move in such a case, because we would need to change those
1098          * relations' pg_class.reltablespace entries to zero, and we don't have
1099          * access to the DB's pg_class to do so.
1100          */
1101         dstdir = AllocateDir(dst_dbpath);
1102         if (dstdir != NULL)
1103         {
1104                 while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1105                 {
1106                         if (strcmp(xlde->d_name, ".") == 0 ||
1107                                 strcmp(xlde->d_name, "..") == 0)
1108                                 continue;
1109
1110                         ereport(ERROR,
1111                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1112                                          errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1113                                                         dbname, tblspcname),
1114                                          errhint("You must move them back to the database's default tablespace before using this command.")));
1115                 }
1116
1117                 FreeDir(dstdir);
1118
1119                 /*
1120                  * The directory exists but is empty. We must remove it before using
1121                  * the copydir function.
1122                  */
1123                 if (rmdir(dst_dbpath) != 0)
1124                         elog(ERROR, "could not remove directory \"%s\": %m",
1125                                  dst_dbpath);
1126         }
1127
1128         /*
1129          * Use an ENSURE block to make sure we remove the debris if the copy fails
1130          * (eg, due to out-of-disk-space).      This is not a 100% solution, because
1131          * of the possibility of failure during transaction commit, but it should
1132          * handle most scenarios.
1133          */
1134         fparms.dest_dboid = db_id;
1135         fparms.dest_tsoid = dst_tblspcoid;
1136         PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1137                                                         PointerGetDatum(&fparms));
1138         {
1139                 /*
1140                  * Copy files from the old tablespace to the new one
1141                  */
1142                 copydir(src_dbpath, dst_dbpath, false);
1143
1144                 /*
1145                  * Record the filesystem change in XLOG
1146                  */
1147                 {
1148                         xl_dbase_create_rec xlrec;
1149                         XLogRecData rdata[1];
1150
1151                         xlrec.db_id = db_id;
1152                         xlrec.tablespace_id = dst_tblspcoid;
1153                         xlrec.src_db_id = db_id;
1154                         xlrec.src_tablespace_id = src_tblspcoid;
1155
1156                         rdata[0].data = (char *) &xlrec;
1157                         rdata[0].len = sizeof(xl_dbase_create_rec);
1158                         rdata[0].buffer = InvalidBuffer;
1159                         rdata[0].next = NULL;
1160
1161                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
1162                 }
1163
1164                 /*
1165                  * Update the database's pg_database tuple
1166                  */
1167                 ScanKeyInit(&scankey,
1168                                         Anum_pg_database_datname,
1169                                         BTEqualStrategyNumber, F_NAMEEQ,
1170                                         NameGetDatum(dbname));
1171                 sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1172                                                                          SnapshotNow, 1, &scankey);
1173                 oldtuple = systable_getnext(sysscan);
1174                 if (!HeapTupleIsValid(oldtuple))                /* shouldn't happen... */
1175                         ereport(ERROR,
1176                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
1177                                          errmsg("database \"%s\" does not exist", dbname)));
1178
1179                 MemSet(new_record, 0, sizeof(new_record));
1180                 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1181                 MemSet(new_record_repl, false, sizeof(new_record_repl));
1182
1183                 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1184                 new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1185
1186                 newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1187                                                                          new_record,
1188                                                                          new_record_nulls, new_record_repl);
1189                 simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple);
1190
1191                 /* Update indexes */
1192                 CatalogUpdateIndexes(pgdbrel, newtuple);
1193
1194                 systable_endscan(sysscan);
1195
1196                 /*
1197                  * Force another checkpoint here.  As in CREATE DATABASE, this is to
1198                  * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1199                  * operation, which would cause us to lose any unlogged operations
1200                  * done in the new DB tablespace before the next checkpoint.
1201                  */
1202                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1203
1204                 /*
1205                  * Force synchronous commit, thus minimizing the window between
1206                  * copying the database files and commital of the transaction. If we
1207                  * crash before committing, we'll leave an orphaned set of files on
1208                  * disk, which is not fatal but not good either.
1209                  */
1210                 ForceSyncCommit();
1211
1212                 /*
1213                  * Close pg_database, but keep lock till commit.
1214                  */
1215                 heap_close(pgdbrel, NoLock);
1216         }
1217         PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1218                                                                 PointerGetDatum(&fparms));
1219
1220         /*
1221          * Commit the transaction so that the pg_database update is committed. If
1222          * we crash while removing files, the database won't be corrupt, we'll
1223          * just leave some orphaned files in the old directory.
1224          *
1225          * (This is OK because we know we aren't inside a transaction block.)
1226          *
1227          * XXX would it be safe/better to do this inside the ensure block?      Not
1228          * convinced it's a good idea; consider elog just after the transaction
1229          * really commits.
1230          */
1231         PopActiveSnapshot();
1232         CommitTransactionCommand();
1233
1234         /* Start new transaction for the remaining work; don't need a snapshot */
1235         StartTransactionCommand();
1236
1237         /*
1238          * Remove files from the old tablespace
1239          */
1240         if (!rmtree(src_dbpath, true))
1241                 ereport(WARNING,
1242                                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1243                                                 src_dbpath)));
1244
1245         /*
1246          * Record the filesystem change in XLOG
1247          */
1248         {
1249                 xl_dbase_drop_rec xlrec;
1250                 XLogRecData rdata[1];
1251
1252                 xlrec.db_id = db_id;
1253                 xlrec.tablespace_id = src_tblspcoid;
1254
1255                 rdata[0].data = (char *) &xlrec;
1256                 rdata[0].len = sizeof(xl_dbase_drop_rec);
1257                 rdata[0].buffer = InvalidBuffer;
1258                 rdata[0].next = NULL;
1259
1260                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1261         }
1262
1263         /* Now it's safe to release the database lock */
1264         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1265                                                                  AccessExclusiveLock);
1266 }
1267
1268 /* Error cleanup callback for movedb */
1269 static void
1270 movedb_failure_callback(int code, Datum arg)
1271 {
1272         movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1273         char       *dstpath;
1274
1275         /* Get rid of anything we managed to copy to the target directory */
1276         dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1277
1278         (void) rmtree(dstpath, true);
1279 }
1280
1281
1282 /*
1283  * ALTER DATABASE name ...
1284  */
1285 void
1286 AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
1287 {
1288         Relation        rel;
1289         HeapTuple       tuple,
1290                                 newtuple;
1291         ScanKeyData scankey;
1292         SysScanDesc scan;
1293         ListCell   *option;
1294         int                     connlimit = -1;
1295         DefElem    *dconnlimit = NULL;
1296         DefElem    *dtablespace = NULL;
1297         Datum           new_record[Natts_pg_database];
1298         bool            new_record_nulls[Natts_pg_database];
1299         bool            new_record_repl[Natts_pg_database];
1300
1301         /* Extract options from the statement node tree */
1302         foreach(option, stmt->options)
1303         {
1304                 DefElem    *defel = (DefElem *) lfirst(option);
1305
1306                 if (strcmp(defel->defname, "connectionlimit") == 0)
1307                 {
1308                         if (dconnlimit)
1309                                 ereport(ERROR,
1310                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1311                                                  errmsg("conflicting or redundant options")));
1312                         dconnlimit = defel;
1313                 }
1314                 else if (strcmp(defel->defname, "tablespace") == 0)
1315                 {
1316                         if (dtablespace)
1317                                 ereport(ERROR,
1318                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1319                                                  errmsg("conflicting or redundant options")));
1320                         dtablespace = defel;
1321                 }
1322                 else
1323                         elog(ERROR, "option \"%s\" not recognized",
1324                                  defel->defname);
1325         }
1326
1327         if (dtablespace)
1328         {
1329                 /* currently, can't be specified along with any other options */
1330                 Assert(!dconnlimit);
1331                 /* this case isn't allowed within a transaction block */
1332                 PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1333                 movedb(stmt->dbname, strVal(dtablespace->arg));
1334                 return;
1335         }
1336
1337         if (dconnlimit)
1338         {
1339                 connlimit = intVal(dconnlimit->arg);
1340                 if (connlimit < -1)
1341                         ereport(ERROR,
1342                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1343                                          errmsg("invalid connection limit: %d", connlimit)));
1344         }
1345
1346         /*
1347          * Get the old tuple.  We don't need a lock on the database per se,
1348          * because we're not going to do anything that would mess up incoming
1349          * connections.
1350          */
1351         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1352         ScanKeyInit(&scankey,
1353                                 Anum_pg_database_datname,
1354                                 BTEqualStrategyNumber, F_NAMEEQ,
1355                                 NameGetDatum(stmt->dbname));
1356         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1357                                                           SnapshotNow, 1, &scankey);
1358         tuple = systable_getnext(scan);
1359         if (!HeapTupleIsValid(tuple))
1360                 ereport(ERROR,
1361                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1362                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
1363
1364         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1365                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1366                                            stmt->dbname);
1367
1368         /*
1369          * Build an updated tuple, perusing the information just obtained
1370          */
1371         MemSet(new_record, 0, sizeof(new_record));
1372         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1373         MemSet(new_record_repl, false, sizeof(new_record_repl));
1374
1375         if (dconnlimit)
1376         {
1377                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
1378                 new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1379         }
1380
1381         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1382                                                                  new_record_nulls, new_record_repl);
1383         simple_heap_update(rel, &tuple->t_self, newtuple);
1384
1385         /* Update indexes */
1386         CatalogUpdateIndexes(rel, newtuple);
1387
1388         systable_endscan(scan);
1389
1390         /* Close pg_database, but keep lock till commit */
1391         heap_close(rel, NoLock);
1392 }
1393
1394
1395 /*
1396  * ALTER DATABASE name SET ...
1397  */
1398 void
1399 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1400 {
1401         Oid                     datid = get_database_oid(stmt->dbname, false);
1402
1403         /*
1404          * Obtain a lock on the database and make sure it didn't go away in the
1405          * meantime.
1406          */
1407         shdepLockAndCheckObject(DatabaseRelationId, datid);
1408
1409         if (!pg_database_ownercheck(datid, GetUserId()))
1410                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1411                                            stmt->dbname);
1412
1413         AlterSetting(datid, InvalidOid, stmt->setstmt);
1414
1415         UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1416 }
1417
1418
1419 /*
1420  * ALTER DATABASE name OWNER TO newowner
1421  */
1422 void
1423 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1424 {
1425         HeapTuple       tuple;
1426         Relation        rel;
1427         ScanKeyData scankey;
1428         SysScanDesc scan;
1429         Form_pg_database datForm;
1430
1431         /*
1432          * Get the old tuple.  We don't need a lock on the database per se,
1433          * because we're not going to do anything that would mess up incoming
1434          * connections.
1435          */
1436         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1437         ScanKeyInit(&scankey,
1438                                 Anum_pg_database_datname,
1439                                 BTEqualStrategyNumber, F_NAMEEQ,
1440                                 NameGetDatum(dbname));
1441         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1442                                                           SnapshotNow, 1, &scankey);
1443         tuple = systable_getnext(scan);
1444         if (!HeapTupleIsValid(tuple))
1445                 ereport(ERROR,
1446                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1447                                  errmsg("database \"%s\" does not exist", dbname)));
1448
1449         datForm = (Form_pg_database) GETSTRUCT(tuple);
1450
1451         /*
1452          * If the new owner is the same as the existing owner, consider the
1453          * command to have succeeded.  This is to be consistent with other
1454          * objects.
1455          */
1456         if (datForm->datdba != newOwnerId)
1457         {
1458                 Datum           repl_val[Natts_pg_database];
1459                 bool            repl_null[Natts_pg_database];
1460                 bool            repl_repl[Natts_pg_database];
1461                 Acl                *newAcl;
1462                 Datum           aclDatum;
1463                 bool            isNull;
1464                 HeapTuple       newtuple;
1465
1466                 /* Otherwise, must be owner of the existing object */
1467                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1468                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1469                                                    dbname);
1470
1471                 /* Must be able to become new owner */
1472                 check_is_member_of_role(GetUserId(), newOwnerId);
1473
1474                 /*
1475                  * must have createdb rights
1476                  *
1477                  * NOTE: This is different from other alter-owner checks in that the
1478                  * current user is checked for createdb privileges instead of the
1479                  * destination owner.  This is consistent with the CREATE case for
1480                  * databases.  Because superusers will always have this right, we need
1481                  * no special case for them.
1482                  */
1483                 if (!have_createdb_privilege())
1484                         ereport(ERROR,
1485                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1486                                    errmsg("permission denied to change owner of database")));
1487
1488                 memset(repl_null, false, sizeof(repl_null));
1489                 memset(repl_repl, false, sizeof(repl_repl));
1490
1491                 repl_repl[Anum_pg_database_datdba - 1] = true;
1492                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1493
1494                 /*
1495                  * Determine the modified ACL for the new owner.  This is only
1496                  * necessary when the ACL is non-null.
1497                  */
1498                 aclDatum = heap_getattr(tuple,
1499                                                                 Anum_pg_database_datacl,
1500                                                                 RelationGetDescr(rel),
1501                                                                 &isNull);
1502                 if (!isNull)
1503                 {
1504                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1505                                                                  datForm->datdba, newOwnerId);
1506                         repl_repl[Anum_pg_database_datacl - 1] = true;
1507                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1508                 }
1509
1510                 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1511                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1512                 CatalogUpdateIndexes(rel, newtuple);
1513
1514                 heap_freetuple(newtuple);
1515
1516                 /* Update owner dependency reference */
1517                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1518                                                                 newOwnerId);
1519         }
1520
1521         systable_endscan(scan);
1522
1523         /* Close pg_database, but keep lock till commit */
1524         heap_close(rel, NoLock);
1525 }
1526
1527
1528 /*
1529  * Helper functions
1530  */
1531
1532 /*
1533  * Look up info about the database named "name".  If the database exists,
1534  * obtain the specified lock type on it, fill in any of the remaining
1535  * parameters that aren't NULL, and return TRUE.  If no such database,
1536  * return FALSE.
1537  */
1538 static bool
1539 get_db_info(const char *name, LOCKMODE lockmode,
1540                         Oid *dbIdP, Oid *ownerIdP,
1541                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1542                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1543                         Oid *dbTablespace, char **dbCollate, char **dbCtype)
1544 {
1545         bool            result = false;
1546         Relation        relation;
1547
1548         AssertArg(name);
1549
1550         /* Caller may wish to grab a better lock on pg_database beforehand... */
1551         relation = heap_open(DatabaseRelationId, AccessShareLock);
1552
1553         /*
1554          * Loop covers the rare case where the database is renamed before we can
1555          * lock it.  We try again just in case we can find a new one of the same
1556          * name.
1557          */
1558         for (;;)
1559         {
1560                 ScanKeyData scanKey;
1561                 SysScanDesc scan;
1562                 HeapTuple       tuple;
1563                 Oid                     dbOid;
1564
1565                 /*
1566                  * there's no syscache for database-indexed-by-name, so must do it the
1567                  * hard way
1568                  */
1569                 ScanKeyInit(&scanKey,
1570                                         Anum_pg_database_datname,
1571                                         BTEqualStrategyNumber, F_NAMEEQ,
1572                                         NameGetDatum(name));
1573
1574                 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1575                                                                   SnapshotNow, 1, &scanKey);
1576
1577                 tuple = systable_getnext(scan);
1578
1579                 if (!HeapTupleIsValid(tuple))
1580                 {
1581                         /* definitely no database of that name */
1582                         systable_endscan(scan);
1583                         break;
1584                 }
1585
1586                 dbOid = HeapTupleGetOid(tuple);
1587
1588                 systable_endscan(scan);
1589
1590                 /*
1591                  * Now that we have a database OID, we can try to lock the DB.
1592                  */
1593                 if (lockmode != NoLock)
1594                         LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1595
1596                 /*
1597                  * And now, re-fetch the tuple by OID.  If it's still there and still
1598                  * the same name, we win; else, drop the lock and loop back to try
1599                  * again.
1600                  */
1601                 tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1602                 if (HeapTupleIsValid(tuple))
1603                 {
1604                         Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1605
1606                         if (strcmp(name, NameStr(dbform->datname)) == 0)
1607                         {
1608                                 /* oid of the database */
1609                                 if (dbIdP)
1610                                         *dbIdP = dbOid;
1611                                 /* oid of the owner */
1612                                 if (ownerIdP)
1613                                         *ownerIdP = dbform->datdba;
1614                                 /* character encoding */
1615                                 if (encodingP)
1616                                         *encodingP = dbform->encoding;
1617                                 /* allowed as template? */
1618                                 if (dbIsTemplateP)
1619                                         *dbIsTemplateP = dbform->datistemplate;
1620                                 /* allowing connections? */
1621                                 if (dbAllowConnP)
1622                                         *dbAllowConnP = dbform->datallowconn;
1623                                 /* last system OID used in database */
1624                                 if (dbLastSysOidP)
1625                                         *dbLastSysOidP = dbform->datlastsysoid;
1626                                 /* limit of frozen XIDs */
1627                                 if (dbFrozenXidP)
1628                                         *dbFrozenXidP = dbform->datfrozenxid;
1629                                 /* default tablespace for this database */
1630                                 if (dbTablespace)
1631                                         *dbTablespace = dbform->dattablespace;
1632                                 /* default locale settings for this database */
1633                                 if (dbCollate)
1634                                         *dbCollate = pstrdup(NameStr(dbform->datcollate));
1635                                 if (dbCtype)
1636                                         *dbCtype = pstrdup(NameStr(dbform->datctype));
1637                                 ReleaseSysCache(tuple);
1638                                 result = true;
1639                                 break;
1640                         }
1641                         /* can only get here if it was just renamed */
1642                         ReleaseSysCache(tuple);
1643                 }
1644
1645                 if (lockmode != NoLock)
1646                         UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1647         }
1648
1649         heap_close(relation, AccessShareLock);
1650
1651         return result;
1652 }
1653
1654 /* Check if current user has createdb privileges */
1655 static bool
1656 have_createdb_privilege(void)
1657 {
1658         bool            result = false;
1659         HeapTuple       utup;
1660
1661         /* Superusers can always do everything */
1662         if (superuser())
1663                 return true;
1664
1665         utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
1666         if (HeapTupleIsValid(utup))
1667         {
1668                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1669                 ReleaseSysCache(utup);
1670         }
1671         return result;
1672 }
1673
1674 /*
1675  * Remove tablespace directories
1676  *
1677  * We don't know what tablespaces db_id is using, so iterate through all
1678  * tablespaces removing <tablespace>/db_id
1679  */
1680 static void
1681 remove_dbtablespaces(Oid db_id)
1682 {
1683         Relation        rel;
1684         HeapScanDesc scan;
1685         HeapTuple       tuple;
1686
1687         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1688         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1689         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1690         {
1691                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1692                 char       *dstpath;
1693                 struct stat st;
1694
1695                 /* Don't mess with the global tablespace */
1696                 if (dsttablespace == GLOBALTABLESPACE_OID)
1697                         continue;
1698
1699                 dstpath = GetDatabasePath(db_id, dsttablespace);
1700
1701                 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1702                 {
1703                         /* Assume we can ignore it */
1704                         pfree(dstpath);
1705                         continue;
1706                 }
1707
1708                 if (!rmtree(dstpath, true))
1709                         ereport(WARNING,
1710                                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
1711                                                         dstpath)));
1712
1713                 /* Record the filesystem change in XLOG */
1714                 {
1715                         xl_dbase_drop_rec xlrec;
1716                         XLogRecData rdata[1];
1717
1718                         xlrec.db_id = db_id;
1719                         xlrec.tablespace_id = dsttablespace;
1720
1721                         rdata[0].data = (char *) &xlrec;
1722                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1723                         rdata[0].buffer = InvalidBuffer;
1724                         rdata[0].next = NULL;
1725
1726                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1727                 }
1728
1729                 pfree(dstpath);
1730         }
1731
1732         heap_endscan(scan);
1733         heap_close(rel, AccessShareLock);
1734 }
1735
1736 /*
1737  * Check for existing files that conflict with a proposed new DB OID;
1738  * return TRUE if there are any
1739  *
1740  * If there were a subdirectory in any tablespace matching the proposed new
1741  * OID, we'd get a create failure due to the duplicate name ... and then we'd
1742  * try to remove that already-existing subdirectory during the cleanup in
1743  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
1744  * instead we make this extra check before settling on the OID of the new
1745  * database.  This exactly parallels what GetNewRelFileNode() does for table
1746  * relfilenode values.
1747  */
1748 static bool
1749 check_db_file_conflict(Oid db_id)
1750 {
1751         bool            result = false;
1752         Relation        rel;
1753         HeapScanDesc scan;
1754         HeapTuple       tuple;
1755
1756         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1757         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1758         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1759         {
1760                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1761                 char       *dstpath;
1762                 struct stat st;
1763
1764                 /* Don't mess with the global tablespace */
1765                 if (dsttablespace == GLOBALTABLESPACE_OID)
1766                         continue;
1767
1768                 dstpath = GetDatabasePath(db_id, dsttablespace);
1769
1770                 if (lstat(dstpath, &st) == 0)
1771                 {
1772                         /* Found a conflicting file (or directory, whatever) */
1773                         pfree(dstpath);
1774                         result = true;
1775                         break;
1776                 }
1777
1778                 pfree(dstpath);
1779         }
1780
1781         heap_endscan(scan);
1782         heap_close(rel, AccessShareLock);
1783         return result;
1784 }
1785
1786 /*
1787  * Issue a suitable errdetail message for a busy database
1788  */
1789 static int
1790 errdetail_busy_db(int notherbackends, int npreparedxacts)
1791 {
1792         /*
1793          * We don't worry about singular versus plural here, since the English
1794          * rules for that don't translate very well.  But we can at least avoid
1795          * the case of zero items.
1796          */
1797         if (notherbackends > 0 && npreparedxacts > 0)
1798                 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1799                                   notherbackends, npreparedxacts);
1800         else if (notherbackends > 0)
1801                 errdetail("There are %d other session(s) using the database.",
1802                                   notherbackends);
1803         else
1804                 errdetail("There are %d prepared transaction(s) using the database.",
1805                                   npreparedxacts);
1806         return 0;                                       /* just to keep ereport macro happy */
1807 }
1808
1809 /*
1810  * get_database_oid - given a database name, look up the OID
1811  *
1812  * If missing_ok is false, throw an error if database name not found.  If
1813  * true, just return InvalidOid.
1814  */
1815 Oid
1816 get_database_oid(const char *dbname, bool missing_ok)
1817 {
1818         Relation        pg_database;
1819         ScanKeyData entry[1];
1820         SysScanDesc scan;
1821         HeapTuple       dbtuple;
1822         Oid                     oid;
1823
1824         /*
1825          * There's no syscache for pg_database indexed by name, so we must look
1826          * the hard way.
1827          */
1828         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1829         ScanKeyInit(&entry[0],
1830                                 Anum_pg_database_datname,
1831                                 BTEqualStrategyNumber, F_NAMEEQ,
1832                                 CStringGetDatum(dbname));
1833         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1834                                                           SnapshotNow, 1, entry);
1835
1836         dbtuple = systable_getnext(scan);
1837
1838         /* We assume that there can be at most one matching tuple */
1839         if (HeapTupleIsValid(dbtuple))
1840                 oid = HeapTupleGetOid(dbtuple);
1841         else
1842                 oid = InvalidOid;
1843
1844         systable_endscan(scan);
1845         heap_close(pg_database, AccessShareLock);
1846
1847         if (!OidIsValid(oid) && !missing_ok)
1848         ereport(ERROR,
1849                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1850                  errmsg("database \"%s\" does not exist",
1851                         dbname)));
1852
1853         return oid;
1854 }
1855
1856
1857 /*
1858  * get_database_name - given a database OID, look up the name
1859  *
1860  * Returns a palloc'd string, or NULL if no such database.
1861  */
1862 char *
1863 get_database_name(Oid dbid)
1864 {
1865         HeapTuple       dbtuple;
1866         char       *result;
1867
1868         dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
1869         if (HeapTupleIsValid(dbtuple))
1870         {
1871                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1872                 ReleaseSysCache(dbtuple);
1873         }
1874         else
1875                 result = NULL;
1876
1877         return result;
1878 }
1879
1880 /*
1881  * DATABASE resource manager's routines
1882  */
1883 void
1884 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1885 {
1886         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1887
1888         /* Backup blocks are not used in dbase records */
1889         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
1890
1891         if (info == XLOG_DBASE_CREATE)
1892         {
1893                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1894                 char       *src_path;
1895                 char       *dst_path;
1896                 struct stat st;
1897
1898                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1899                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1900
1901                 /*
1902                  * Our theory for replaying a CREATE is to forcibly drop the target
1903                  * subdirectory if present, then re-copy the source data. This may be
1904                  * more work than needed, but it is simple to implement.
1905                  */
1906                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1907                 {
1908                         if (!rmtree(dst_path, true))
1909                                 /* If this failed, copydir() below is going to error. */
1910                                 ereport(WARNING,
1911                                                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1912                                                                 dst_path)));
1913                 }
1914
1915                 /*
1916                  * Force dirty buffers out to disk, to ensure source database is
1917                  * up-to-date for the copy.
1918                  */
1919                 FlushDatabaseBuffers(xlrec->src_db_id);
1920
1921                 /*
1922                  * Copy this subdirectory to the new location
1923                  *
1924                  * We don't need to copy subdirectories
1925                  */
1926                 copydir(src_path, dst_path, false);
1927         }
1928         else if (info == XLOG_DBASE_DROP)
1929         {
1930                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1931                 char       *dst_path;
1932
1933                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1934
1935                 if (InHotStandby)
1936                 {
1937                         /*
1938                          * Lock database while we resolve conflicts to ensure that
1939                          * InitPostgres() cannot fully re-execute concurrently. This
1940                          * avoids backends re-connecting automatically to same database,
1941                          * which can happen in some cases.
1942                          */
1943                         LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
1944                         ResolveRecoveryConflictWithDatabase(xlrec->db_id);
1945                 }
1946
1947                 /* Drop pages for this database that are in the shared buffer cache */
1948                 DropDatabaseBuffers(xlrec->db_id);
1949
1950                 /* Also, clean out any fsync requests that might be pending in md.c */
1951                 ForgetDatabaseFsyncRequests(xlrec->db_id);
1952
1953                 /* Clean out the xlog relcache too */
1954                 XLogDropDatabase(xlrec->db_id);
1955
1956                 /* And remove the physical files */
1957                 if (!rmtree(dst_path, true))
1958                         ereport(WARNING,
1959                                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
1960                                                         dst_path)));
1961
1962                 if (InHotStandby)
1963                 {
1964                         /*
1965                          * Release locks prior to commit. XXX There is a race condition
1966                          * here that may allow backends to reconnect, but the window for
1967                          * this is small because the gap between here and commit is mostly
1968                          * fairly small and it is unlikely that people will be dropping
1969                          * databases that we are trying to connect to anyway.
1970                          */
1971                         UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
1972                 }
1973         }
1974         else
1975                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1976 }
1977
1978 void
1979 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
1980 {
1981         uint8           info = xl_info & ~XLR_INFO_MASK;
1982
1983         if (info == XLOG_DBASE_CREATE)
1984         {
1985                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1986
1987                 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
1988                                                  xlrec->src_db_id, xlrec->src_tablespace_id,
1989                                                  xlrec->db_id, xlrec->tablespace_id);
1990         }
1991         else if (info == XLOG_DBASE_DROP)
1992         {
1993                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1994
1995                 appendStringInfo(buf, "drop db: dir %u/%u",
1996                                                  xlrec->db_id, xlrec->tablespace_id);
1997         }
1998         else
1999                 appendStringInfo(buf, "UNKNOWN");
2000 }