]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Rename various "freeze multixact" variables
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes.  Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        src/backend/commands/dbcommands.c
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <locale.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
26
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/htup_details.h"
30 #include "access/xact.h"
31 #include "access/xlogutils.h"
32 #include "catalog/catalog.h"
33 #include "catalog/dependency.h"
34 #include "catalog/indexing.h"
35 #include "catalog/objectaccess.h"
36 #include "catalog/pg_authid.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_db_role_setting.h"
39 #include "catalog/pg_tablespace.h"
40 #include "commands/comment.h"
41 #include "commands/dbcommands.h"
42 #include "commands/seclabel.h"
43 #include "commands/tablespace.h"
44 #include "mb/pg_wchar.h"
45 #include "miscadmin.h"
46 #include "pgstat.h"
47 #include "postmaster/bgwriter.h"
48 #include "storage/copydir.h"
49 #include "storage/fd.h"
50 #include "storage/lmgr.h"
51 #include "storage/ipc.h"
52 #include "storage/procarray.h"
53 #include "storage/smgr.h"
54 #include "utils/acl.h"
55 #include "utils/builtins.h"
56 #include "utils/fmgroids.h"
57 #include "utils/pg_locale.h"
58 #include "utils/snapmgr.h"
59 #include "utils/syscache.h"
60 #include "utils/tqual.h"
61
62
63 typedef struct
64 {
65         Oid                     src_dboid;              /* source (template) DB */
66         Oid                     dest_dboid;             /* DB we are trying to create */
67 } createdb_failure_params;
68
69 typedef struct
70 {
71         Oid                     dest_dboid;             /* DB we are trying to move */
72         Oid                     dest_tsoid;             /* tablespace we are trying to move to */
73 } movedb_failure_params;
74
75 /* non-export function prototypes */
76 static void createdb_failure_callback(int code, Datum arg);
77 static void movedb(const char *dbname, const char *tblspcname);
78 static void movedb_failure_callback(int code, Datum arg);
79 static bool get_db_info(const char *name, LOCKMODE lockmode,
80                         Oid *dbIdP, Oid *ownerIdP,
81                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
82                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
83                         MultiXactId *dbMinMultiP,
84                         Oid *dbTablespace, char **dbCollate, char **dbCtype);
85 static bool have_createdb_privilege(void);
86 static void remove_dbtablespaces(Oid db_id);
87 static bool check_db_file_conflict(Oid db_id);
88 static int      errdetail_busy_db(int notherbackends, int npreparedxacts);
89
90
91 /*
92  * CREATE DATABASE
93  */
94 Oid
95 createdb(const CreatedbStmt *stmt)
96 {
97         HeapScanDesc scan;
98         Relation        rel;
99         Oid                     src_dboid;
100         Oid                     src_owner;
101         int                     src_encoding;
102         char       *src_collate;
103         char       *src_ctype;
104         bool            src_istemplate;
105         bool            src_allowconn;
106         Oid                     src_lastsysoid;
107         TransactionId src_frozenxid;
108         MultiXactId src_minmxid;
109         Oid                     src_deftablespace;
110         volatile Oid dst_deftablespace;
111         Relation        pg_database_rel;
112         HeapTuple       tuple;
113         Datum           new_record[Natts_pg_database];
114         bool            new_record_nulls[Natts_pg_database];
115         Oid                     dboid;
116         Oid                     datdba;
117         ListCell   *option;
118         DefElem    *dtablespacename = NULL;
119         DefElem    *downer = NULL;
120         DefElem    *dtemplate = NULL;
121         DefElem    *dencoding = NULL;
122         DefElem    *dcollate = NULL;
123         DefElem    *dctype = NULL;
124         DefElem    *dconnlimit = NULL;
125         char       *dbname = stmt->dbname;
126         char       *dbowner = NULL;
127         const char *dbtemplate = NULL;
128         char       *dbcollate = NULL;
129         char       *dbctype = NULL;
130         char       *canonname;
131         int                     encoding = -1;
132         int                     dbconnlimit = -1;
133         int                     notherbackends;
134         int                     npreparedxacts;
135         createdb_failure_params fparms;
136
137         /* Extract options from the statement node tree */
138         foreach(option, stmt->options)
139         {
140                 DefElem    *defel = (DefElem *) lfirst(option);
141
142                 if (strcmp(defel->defname, "tablespace") == 0)
143                 {
144                         if (dtablespacename)
145                                 ereport(ERROR,
146                                                 (errcode(ERRCODE_SYNTAX_ERROR),
147                                                  errmsg("conflicting or redundant options")));
148                         dtablespacename = defel;
149                 }
150                 else if (strcmp(defel->defname, "owner") == 0)
151                 {
152                         if (downer)
153                                 ereport(ERROR,
154                                                 (errcode(ERRCODE_SYNTAX_ERROR),
155                                                  errmsg("conflicting or redundant options")));
156                         downer = defel;
157                 }
158                 else if (strcmp(defel->defname, "template") == 0)
159                 {
160                         if (dtemplate)
161                                 ereport(ERROR,
162                                                 (errcode(ERRCODE_SYNTAX_ERROR),
163                                                  errmsg("conflicting or redundant options")));
164                         dtemplate = defel;
165                 }
166                 else if (strcmp(defel->defname, "encoding") == 0)
167                 {
168                         if (dencoding)
169                                 ereport(ERROR,
170                                                 (errcode(ERRCODE_SYNTAX_ERROR),
171                                                  errmsg("conflicting or redundant options")));
172                         dencoding = defel;
173                 }
174                 else if (strcmp(defel->defname, "lc_collate") == 0)
175                 {
176                         if (dcollate)
177                                 ereport(ERROR,
178                                                 (errcode(ERRCODE_SYNTAX_ERROR),
179                                                  errmsg("conflicting or redundant options")));
180                         dcollate = defel;
181                 }
182                 else if (strcmp(defel->defname, "lc_ctype") == 0)
183                 {
184                         if (dctype)
185                                 ereport(ERROR,
186                                                 (errcode(ERRCODE_SYNTAX_ERROR),
187                                                  errmsg("conflicting or redundant options")));
188                         dctype = defel;
189                 }
190                 else if (strcmp(defel->defname, "connectionlimit") == 0)
191                 {
192                         if (dconnlimit)
193                                 ereport(ERROR,
194                                                 (errcode(ERRCODE_SYNTAX_ERROR),
195                                                  errmsg("conflicting or redundant options")));
196                         dconnlimit = defel;
197                 }
198                 else if (strcmp(defel->defname, "location") == 0)
199                 {
200                         ereport(WARNING,
201                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
202                                          errmsg("LOCATION is not supported anymore"),
203                                          errhint("Consider using tablespaces instead.")));
204                 }
205                 else
206                         elog(ERROR, "option \"%s\" not recognized",
207                                  defel->defname);
208         }
209
210         if (downer && downer->arg)
211                 dbowner = strVal(downer->arg);
212         if (dtemplate && dtemplate->arg)
213                 dbtemplate = strVal(dtemplate->arg);
214         if (dencoding && dencoding->arg)
215         {
216                 const char *encoding_name;
217
218                 if (IsA(dencoding->arg, Integer))
219                 {
220                         encoding = intVal(dencoding->arg);
221                         encoding_name = pg_encoding_to_char(encoding);
222                         if (strcmp(encoding_name, "") == 0 ||
223                                 pg_valid_server_encoding(encoding_name) < 0)
224                                 ereport(ERROR,
225                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
226                                                  errmsg("%d is not a valid encoding code",
227                                                                 encoding)));
228                 }
229                 else if (IsA(dencoding->arg, String))
230                 {
231                         encoding_name = strVal(dencoding->arg);
232                         encoding = pg_valid_server_encoding(encoding_name);
233                         if (encoding < 0)
234                                 ereport(ERROR,
235                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
236                                                  errmsg("%s is not a valid encoding name",
237                                                                 encoding_name)));
238                 }
239                 else
240                         elog(ERROR, "unrecognized node type: %d",
241                                  nodeTag(dencoding->arg));
242         }
243         if (dcollate && dcollate->arg)
244                 dbcollate = strVal(dcollate->arg);
245         if (dctype && dctype->arg)
246                 dbctype = strVal(dctype->arg);
247
248         if (dconnlimit && dconnlimit->arg)
249         {
250                 dbconnlimit = intVal(dconnlimit->arg);
251                 if (dbconnlimit < -1)
252                         ereport(ERROR,
253                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
254                                          errmsg("invalid connection limit: %d", dbconnlimit)));
255         }
256
257         /* obtain OID of proposed owner */
258         if (dbowner)
259                 datdba = get_role_oid(dbowner, false);
260         else
261                 datdba = GetUserId();
262
263         /*
264          * To create a database, must have createdb privilege and must be able to
265          * become the target role (this does not imply that the target role itself
266          * must have createdb privilege).  The latter provision guards against
267          * "giveaway" attacks.  Note that a superuser will always have both of
268          * these privileges a fortiori.
269          */
270         if (!have_createdb_privilege())
271                 ereport(ERROR,
272                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
273                                  errmsg("permission denied to create database")));
274
275         check_is_member_of_role(GetUserId(), datdba);
276
277         /*
278          * Lookup database (template) to be cloned, and obtain share lock on it.
279          * ShareLock allows two CREATE DATABASEs to work from the same template
280          * concurrently, while ensuring no one is busy dropping it in parallel
281          * (which would be Very Bad since we'd likely get an incomplete copy
282          * without knowing it).  This also prevents any new connections from being
283          * made to the source until we finish copying it, so we can be sure it
284          * won't change underneath us.
285          */
286         if (!dbtemplate)
287                 dbtemplate = "template1";               /* Default template database name */
288
289         if (!get_db_info(dbtemplate, ShareLock,
290                                          &src_dboid, &src_owner, &src_encoding,
291                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
292                                          &src_frozenxid, &src_minmxid, &src_deftablespace,
293                                          &src_collate, &src_ctype))
294                 ereport(ERROR,
295                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
296                                  errmsg("template database \"%s\" does not exist",
297                                                 dbtemplate)));
298
299         /*
300          * Permission check: to copy a DB that's not marked datistemplate, you
301          * must be superuser or the owner thereof.
302          */
303         if (!src_istemplate)
304         {
305                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
306                         ereport(ERROR,
307                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
308                                          errmsg("permission denied to copy database \"%s\"",
309                                                         dbtemplate)));
310         }
311
312         /* If encoding or locales are defaulted, use source's setting */
313         if (encoding < 0)
314                 encoding = src_encoding;
315         if (dbcollate == NULL)
316                 dbcollate = src_collate;
317         if (dbctype == NULL)
318                 dbctype = src_ctype;
319
320         /* Some encodings are client only */
321         if (!PG_VALID_BE_ENCODING(encoding))
322                 ereport(ERROR,
323                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
324                                  errmsg("invalid server encoding %d", encoding)));
325
326         /* Check that the chosen locales are valid, and get canonical spellings */
327         if (!check_locale(LC_COLLATE, dbcollate, &canonname))
328                 ereport(ERROR,
329                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
330                                  errmsg("invalid locale name: \"%s\"", dbcollate)));
331         dbcollate = canonname;
332         if (!check_locale(LC_CTYPE, dbctype, &canonname))
333                 ereport(ERROR,
334                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
335                                  errmsg("invalid locale name: \"%s\"", dbctype)));
336         dbctype = canonname;
337
338         check_encoding_locale_matches(encoding, dbcollate, dbctype);
339
340         /*
341          * Check that the new encoding and locale settings match the source
342          * database.  We insist on this because we simply copy the source data ---
343          * any non-ASCII data would be wrongly encoded, and any indexes sorted
344          * according to the source locale would be wrong.
345          *
346          * However, we assume that template0 doesn't contain any non-ASCII data
347          * nor any indexes that depend on collation or ctype, so template0 can be
348          * used as template for creating a database with any encoding or locale.
349          */
350         if (strcmp(dbtemplate, "template0") != 0)
351         {
352                 if (encoding != src_encoding)
353                         ereport(ERROR,
354                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
355                                          errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
356                                                         pg_encoding_to_char(encoding),
357                                                         pg_encoding_to_char(src_encoding)),
358                                          errhint("Use the same encoding as in the template database, or use template0 as template.")));
359
360                 if (strcmp(dbcollate, src_collate) != 0)
361                         ereport(ERROR,
362                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
363                                          errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
364                                                         dbcollate, src_collate),
365                                          errhint("Use the same collation as in the template database, or use template0 as template.")));
366
367                 if (strcmp(dbctype, src_ctype) != 0)
368                         ereport(ERROR,
369                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
370                                          errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
371                                                         dbctype, src_ctype),
372                                          errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
373         }
374
375         /* Resolve default tablespace for new database */
376         if (dtablespacename && dtablespacename->arg)
377         {
378                 char       *tablespacename;
379                 AclResult       aclresult;
380
381                 tablespacename = strVal(dtablespacename->arg);
382                 dst_deftablespace = get_tablespace_oid(tablespacename, false);
383                 /* check permissions */
384                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
385                                                                                    ACL_CREATE);
386                 if (aclresult != ACLCHECK_OK)
387                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
388                                                    tablespacename);
389
390                 /* pg_global must never be the default tablespace */
391                 if (dst_deftablespace == GLOBALTABLESPACE_OID)
392                         ereport(ERROR,
393                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
394                                   errmsg("pg_global cannot be used as default tablespace")));
395
396                 /*
397                  * If we are trying to change the default tablespace of the template,
398                  * we require that the template not have any files in the new default
399                  * tablespace.  This is necessary because otherwise the copied
400                  * database would contain pg_class rows that refer to its default
401                  * tablespace both explicitly (by OID) and implicitly (as zero), which
402                  * would cause problems.  For example another CREATE DATABASE using
403                  * the copied database as template, and trying to change its default
404                  * tablespace again, would yield outright incorrect results (it would
405                  * improperly move tables to the new default tablespace that should
406                  * stay in the same tablespace).
407                  */
408                 if (dst_deftablespace != src_deftablespace)
409                 {
410                         char       *srcpath;
411                         struct stat st;
412
413                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
414
415                         if (stat(srcpath, &st) == 0 &&
416                                 S_ISDIR(st.st_mode) &&
417                                 !directory_is_empty(srcpath))
418                                 ereport(ERROR,
419                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420                                                  errmsg("cannot assign new default tablespace \"%s\"",
421                                                                 tablespacename),
422                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
423                                                                    dbtemplate)));
424                         pfree(srcpath);
425                 }
426         }
427         else
428         {
429                 /* Use template database's default tablespace */
430                 dst_deftablespace = src_deftablespace;
431                 /* Note there is no additional permission check in this path */
432         }
433
434         /*
435          * Check for db name conflict.  This is just to give a more friendly error
436          * message than "unique index violation".  There's a race condition but
437          * we're willing to accept the less friendly message in that case.
438          */
439         if (OidIsValid(get_database_oid(dbname, true)))
440                 ereport(ERROR,
441                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
442                                  errmsg("database \"%s\" already exists", dbname)));
443
444         /*
445          * The source DB can't have any active backends, except this one
446          * (exception is to allow CREATE DB while connected to template1).
447          * Otherwise we might copy inconsistent data.
448          *
449          * This should be last among the basic error checks, because it involves
450          * potential waiting; we may as well throw an error first if we're gonna
451          * throw one.
452          */
453         if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
454                 ereport(ERROR,
455                                 (errcode(ERRCODE_OBJECT_IN_USE),
456                         errmsg("source database \"%s\" is being accessed by other users",
457                                    dbtemplate),
458                                  errdetail_busy_db(notherbackends, npreparedxacts)));
459
460         /*
461          * Select an OID for the new database, checking that it doesn't have a
462          * filename conflict with anything already existing in the tablespace
463          * directories.
464          */
465         pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
466
467         do
468         {
469                 dboid = GetNewOid(pg_database_rel);
470         } while (check_db_file_conflict(dboid));
471
472         /*
473          * Insert a new tuple into pg_database.  This establishes our ownership of
474          * the new database name (anyone else trying to insert the same name will
475          * block on the unique index, and fail after we commit).
476          */
477
478         /* Form tuple */
479         MemSet(new_record, 0, sizeof(new_record));
480         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
481
482         new_record[Anum_pg_database_datname - 1] =
483                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
484         new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
485         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
486         new_record[Anum_pg_database_datcollate - 1] =
487                 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
488         new_record[Anum_pg_database_datctype - 1] =
489                 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
490         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
491         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
492         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
493         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
494         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
495         new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
496         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
497
498         /*
499          * We deliberately set datacl to default (NULL), rather than copying it
500          * from the template database.  Copying it would be a bad idea when the
501          * owner is not the same as the template's owner.
502          */
503         new_record_nulls[Anum_pg_database_datacl - 1] = true;
504
505         tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
506                                                         new_record, new_record_nulls);
507
508         HeapTupleSetOid(tuple, dboid);
509
510         simple_heap_insert(pg_database_rel, tuple);
511
512         /* Update indexes */
513         CatalogUpdateIndexes(pg_database_rel, tuple);
514
515         /*
516          * Now generate additional catalog entries associated with the new DB
517          */
518
519         /* Register owner dependency */
520         recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
521
522         /* Create pg_shdepend entries for objects within database */
523         copyTemplateDependencies(src_dboid, dboid);
524
525         /* Post creation hook for new database */
526         InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
527
528         /*
529          * Force a checkpoint before starting the copy. This will force dirty
530          * buffers out to disk, to ensure source database is up-to-date on disk
531          * for the copy. FlushDatabaseBuffers() would suffice for that, but we
532          * also want to process any pending unlink requests. Otherwise, if a
533          * checkpoint happened while we're copying files, a file might be deleted
534          * just when we're about to copy it, causing the lstat() call in copydir()
535          * to fail with ENOENT.
536          */
537         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
538
539         /*
540          * Once we start copying subdirectories, we need to be able to clean 'em
541          * up if we fail.  Use an ENSURE block to make sure this happens.  (This
542          * is not a 100% solution, because of the possibility of failure during
543          * transaction commit after we leave this routine, but it should handle
544          * most scenarios.)
545          */
546         fparms.src_dboid = src_dboid;
547         fparms.dest_dboid = dboid;
548         PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
549                                                         PointerGetDatum(&fparms));
550         {
551                 /*
552                  * Iterate through all tablespaces of the template database, and copy
553                  * each one to the new database.
554                  */
555                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
556                 scan = heap_beginscan_catalog(rel, 0, NULL);
557                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
558                 {
559                         Oid                     srctablespace = HeapTupleGetOid(tuple);
560                         Oid                     dsttablespace;
561                         char       *srcpath;
562                         char       *dstpath;
563                         struct stat st;
564
565                         /* No need to copy global tablespace */
566                         if (srctablespace == GLOBALTABLESPACE_OID)
567                                 continue;
568
569                         srcpath = GetDatabasePath(src_dboid, srctablespace);
570
571                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
572                                 directory_is_empty(srcpath))
573                         {
574                                 /* Assume we can ignore it */
575                                 pfree(srcpath);
576                                 continue;
577                         }
578
579                         if (srctablespace == src_deftablespace)
580                                 dsttablespace = dst_deftablespace;
581                         else
582                                 dsttablespace = srctablespace;
583
584                         dstpath = GetDatabasePath(dboid, dsttablespace);
585
586                         /*
587                          * Copy this subdirectory to the new location
588                          *
589                          * We don't need to copy subdirectories
590                          */
591                         copydir(srcpath, dstpath, false);
592
593                         /* Record the filesystem change in XLOG */
594                         {
595                                 xl_dbase_create_rec xlrec;
596                                 XLogRecData rdata[1];
597
598                                 xlrec.db_id = dboid;
599                                 xlrec.tablespace_id = dsttablespace;
600                                 xlrec.src_db_id = src_dboid;
601                                 xlrec.src_tablespace_id = srctablespace;
602
603                                 rdata[0].data = (char *) &xlrec;
604                                 rdata[0].len = sizeof(xl_dbase_create_rec);
605                                 rdata[0].buffer = InvalidBuffer;
606                                 rdata[0].next = NULL;
607
608                                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
609                         }
610                 }
611                 heap_endscan(scan);
612                 heap_close(rel, AccessShareLock);
613
614                 /*
615                  * We force a checkpoint before committing.  This effectively means
616                  * that committed XLOG_DBASE_CREATE operations will never need to be
617                  * replayed (at least not in ordinary crash recovery; we still have to
618                  * make the XLOG entry for the benefit of PITR operations). This
619                  * avoids two nasty scenarios:
620                  *
621                  * #1: When PITR is off, we don't XLOG the contents of newly created
622                  * indexes; therefore the drop-and-recreate-whole-directory behavior
623                  * of DBASE_CREATE replay would lose such indexes.
624                  *
625                  * #2: Since we have to recopy the source database during DBASE_CREATE
626                  * replay, we run the risk of copying changes in it that were
627                  * committed after the original CREATE DATABASE command but before the
628                  * system crash that led to the replay.  This is at least unexpected
629                  * and at worst could lead to inconsistencies, eg duplicate table
630                  * names.
631                  *
632                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
633                  *
634                  * In PITR replay, the first of these isn't an issue, and the second
635                  * is only a risk if the CREATE DATABASE and subsequent template
636                  * database change both occur while a base backup is being taken.
637                  * There doesn't seem to be much we can do about that except document
638                  * it as a limitation.
639                  *
640                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
641                  * we can avoid this.
642                  */
643                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
644
645                 /*
646                  * Close pg_database, but keep lock till commit.
647                  */
648                 heap_close(pg_database_rel, NoLock);
649
650                 /*
651                  * Force synchronous commit, thus minimizing the window between
652                  * creation of the database files and commital of the transaction. If
653                  * we crash before committing, we'll have a DB that's taking up disk
654                  * space but is not in pg_database, which is not good.
655                  */
656                 ForceSyncCommit();
657         }
658         PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
659                                                                 PointerGetDatum(&fparms));
660
661         return dboid;
662 }
663
664 /*
665  * Check whether chosen encoding matches chosen locale settings.  This
666  * restriction is necessary because libc's locale-specific code usually
667  * fails when presented with data in an encoding it's not expecting. We
668  * allow mismatch in four cases:
669  *
670  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
671  * which works with any encoding.
672  *
673  * 2. locale encoding = -1, which means that we couldn't determine the
674  * locale's encoding and have to trust the user to get it right.
675  *
676  * 3. selected encoding is UTF8 and platform is win32. This is because
677  * UTF8 is a pseudo codepage that is supported in all locales since it's
678  * converted to UTF16 before being used.
679  *
680  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
681  * is risky but we have historically allowed it --- notably, the
682  * regression tests require it.
683  *
684  * Note: if you change this policy, fix initdb to match.
685  */
686 void
687 check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
688 {
689         int                     ctype_encoding = pg_get_encoding_from_locale(ctype, true);
690         int                     collate_encoding = pg_get_encoding_from_locale(collate, true);
691
692         if (!(ctype_encoding == encoding ||
693                   ctype_encoding == PG_SQL_ASCII ||
694                   ctype_encoding == -1 ||
695 #ifdef WIN32
696                   encoding == PG_UTF8 ||
697 #endif
698                   (encoding == PG_SQL_ASCII && superuser())))
699                 ereport(ERROR,
700                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
701                                  errmsg("encoding \"%s\" does not match locale \"%s\"",
702                                                 pg_encoding_to_char(encoding),
703                                                 ctype),
704                    errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
705                                          pg_encoding_to_char(ctype_encoding))));
706
707         if (!(collate_encoding == encoding ||
708                   collate_encoding == PG_SQL_ASCII ||
709                   collate_encoding == -1 ||
710 #ifdef WIN32
711                   encoding == PG_UTF8 ||
712 #endif
713                   (encoding == PG_SQL_ASCII && superuser())))
714                 ereport(ERROR,
715                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
716                                  errmsg("encoding \"%s\" does not match locale \"%s\"",
717                                                 pg_encoding_to_char(encoding),
718                                                 collate),
719                  errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
720                                    pg_encoding_to_char(collate_encoding))));
721 }
722
723 /* Error cleanup callback for createdb */
724 static void
725 createdb_failure_callback(int code, Datum arg)
726 {
727         createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
728
729         /*
730          * Release lock on source database before doing recursive remove. This is
731          * not essential but it seems desirable to release the lock as soon as
732          * possible.
733          */
734         UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
735
736         /* Throw away any successfully copied subdirectories */
737         remove_dbtablespaces(fparms->dest_dboid);
738 }
739
740
741 /*
742  * DROP DATABASE
743  */
744 void
745 dropdb(const char *dbname, bool missing_ok)
746 {
747         Oid                     db_id;
748         bool            db_istemplate;
749         Relation        pgdbrel;
750         HeapTuple       tup;
751         int                     notherbackends;
752         int                     npreparedxacts;
753
754         /*
755          * Look up the target database's OID, and get exclusive lock on it. We
756          * need this to ensure that no new backend starts up in the target
757          * database while we are deleting it (see postinit.c), and that no one is
758          * using it as a CREATE DATABASE template or trying to delete it for
759          * themselves.
760          */
761         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
762
763         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
764                                    &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
765         {
766                 if (!missing_ok)
767                 {
768                         ereport(ERROR,
769                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
770                                          errmsg("database \"%s\" does not exist", dbname)));
771                 }
772                 else
773                 {
774                         /* Close pg_database, release the lock, since we changed nothing */
775                         heap_close(pgdbrel, RowExclusiveLock);
776                         ereport(NOTICE,
777                                         (errmsg("database \"%s\" does not exist, skipping",
778                                                         dbname)));
779                         return;
780                 }
781         }
782
783         /*
784          * Permission checks
785          */
786         if (!pg_database_ownercheck(db_id, GetUserId()))
787                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
788                                            dbname);
789
790         /* DROP hook for the database being removed */
791         InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
792
793         /*
794          * Disallow dropping a DB that is marked istemplate.  This is just to
795          * prevent people from accidentally dropping template0 or template1; they
796          * can do so if they're really determined ...
797          */
798         if (db_istemplate)
799                 ereport(ERROR,
800                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
801                                  errmsg("cannot drop a template database")));
802
803         /* Obviously can't drop my own database */
804         if (db_id == MyDatabaseId)
805                 ereport(ERROR,
806                                 (errcode(ERRCODE_OBJECT_IN_USE),
807                                  errmsg("cannot drop the currently open database")));
808
809         /*
810          * Check for other backends in the target database.  (Because we hold the
811          * database lock, no new ones can start after this.)
812          *
813          * As in CREATE DATABASE, check this after other error conditions.
814          */
815         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
816                 ereport(ERROR,
817                                 (errcode(ERRCODE_OBJECT_IN_USE),
818                                  errmsg("database \"%s\" is being accessed by other users",
819                                                 dbname),
820                                  errdetail_busy_db(notherbackends, npreparedxacts)));
821
822         /*
823          * Remove the database's tuple from pg_database.
824          */
825         tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
826         if (!HeapTupleIsValid(tup))
827                 elog(ERROR, "cache lookup failed for database %u", db_id);
828
829         simple_heap_delete(pgdbrel, &tup->t_self);
830
831         ReleaseSysCache(tup);
832
833         /*
834          * Delete any comments or security labels associated with the database.
835          */
836         DeleteSharedComments(db_id, DatabaseRelationId);
837         DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
838
839         /*
840          * Remove settings associated with this database
841          */
842         DropSetting(db_id, InvalidOid);
843
844         /*
845          * Remove shared dependency references for the database.
846          */
847         dropDatabaseDependencies(db_id);
848
849         /*
850          * Drop pages for this database that are in the shared buffer cache. This
851          * is important to ensure that no remaining backend tries to write out a
852          * dirty buffer to the dead database later...
853          */
854         DropDatabaseBuffers(db_id);
855
856         /*
857          * Tell the stats collector to forget it immediately, too.
858          */
859         pgstat_drop_database(db_id);
860
861         /*
862          * Tell checkpointer to forget any pending fsync and unlink requests for
863          * files in the database; else the fsyncs will fail at next checkpoint, or
864          * worse, it will delete files that belong to a newly created database
865          * with the same OID.
866          */
867         ForgetDatabaseFsyncRequests(db_id);
868
869         /*
870          * Force a checkpoint to make sure the checkpointer has received the
871          * message sent by ForgetDatabaseFsyncRequests. On Windows, this also
872          * ensures that background procs don't hold any open files, which would
873          * cause rmdir() to fail.
874          */
875         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
876
877         /*
878          * Remove all tablespace subdirs belonging to the database.
879          */
880         remove_dbtablespaces(db_id);
881
882         /*
883          * Close pg_database, but keep lock till commit.
884          */
885         heap_close(pgdbrel, NoLock);
886
887         /*
888          * Force synchronous commit, thus minimizing the window between removal of
889          * the database files and commital of the transaction. If we crash before
890          * committing, we'll have a DB that's gone on disk but still there
891          * according to pg_database, which is not good.
892          */
893         ForceSyncCommit();
894 }
895
896
897 /*
898  * Rename database
899  */
900 Oid
901 RenameDatabase(const char *oldname, const char *newname)
902 {
903         Oid                     db_id;
904         HeapTuple       newtup;
905         Relation        rel;
906         int                     notherbackends;
907         int                     npreparedxacts;
908
909         /*
910          * Look up the target database's OID, and get exclusive lock on it. We
911          * need this for the same reasons as DROP DATABASE.
912          */
913         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
914
915         if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
916                                          NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
917                 ereport(ERROR,
918                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
919                                  errmsg("database \"%s\" does not exist", oldname)));
920
921         /* must be owner */
922         if (!pg_database_ownercheck(db_id, GetUserId()))
923                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
924                                            oldname);
925
926         /* must have createdb rights */
927         if (!have_createdb_privilege())
928                 ereport(ERROR,
929                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
930                                  errmsg("permission denied to rename database")));
931
932         /*
933          * Make sure the new name doesn't exist.  See notes for same error in
934          * CREATE DATABASE.
935          */
936         if (OidIsValid(get_database_oid(newname, true)))
937                 ereport(ERROR,
938                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
939                                  errmsg("database \"%s\" already exists", newname)));
940
941         /*
942          * XXX Client applications probably store the current database somewhere,
943          * so renaming it could cause confusion.  On the other hand, there may not
944          * be an actual problem besides a little confusion, so think about this
945          * and decide.
946          */
947         if (db_id == MyDatabaseId)
948                 ereport(ERROR,
949                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
950                                  errmsg("current database cannot be renamed")));
951
952         /*
953          * Make sure the database does not have active sessions.  This is the same
954          * concern as above, but applied to other sessions.
955          *
956          * As in CREATE DATABASE, check this after other error conditions.
957          */
958         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
959                 ereport(ERROR,
960                                 (errcode(ERRCODE_OBJECT_IN_USE),
961                                  errmsg("database \"%s\" is being accessed by other users",
962                                                 oldname),
963                                  errdetail_busy_db(notherbackends, npreparedxacts)));
964
965         /* rename */
966         newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
967         if (!HeapTupleIsValid(newtup))
968                 elog(ERROR, "cache lookup failed for database %u", db_id);
969         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
970         simple_heap_update(rel, &newtup->t_self, newtup);
971         CatalogUpdateIndexes(rel, newtup);
972
973         InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
974
975         /*
976          * Close pg_database, but keep lock till commit.
977          */
978         heap_close(rel, NoLock);
979
980         return db_id;
981 }
982
983
984 /*
985  * ALTER DATABASE SET TABLESPACE
986  */
987 static void
988 movedb(const char *dbname, const char *tblspcname)
989 {
990         Oid                     db_id;
991         Relation        pgdbrel;
992         int                     notherbackends;
993         int                     npreparedxacts;
994         HeapTuple       oldtuple,
995                                 newtuple;
996         Oid                     src_tblspcoid,
997                                 dst_tblspcoid;
998         Datum           new_record[Natts_pg_database];
999         bool            new_record_nulls[Natts_pg_database];
1000         bool            new_record_repl[Natts_pg_database];
1001         ScanKeyData scankey;
1002         SysScanDesc sysscan;
1003         AclResult       aclresult;
1004         char       *src_dbpath;
1005         char       *dst_dbpath;
1006         DIR                *dstdir;
1007         struct dirent *xlde;
1008         movedb_failure_params fparms;
1009
1010         /*
1011          * Look up the target database's OID, and get exclusive lock on it. We
1012          * need this to ensure that no new backend starts up in the database while
1013          * we are moving it, and that no one is using it as a CREATE DATABASE
1014          * template or trying to delete it.
1015          */
1016         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
1017
1018         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1019                                    NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1020                 ereport(ERROR,
1021                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1022                                  errmsg("database \"%s\" does not exist", dbname)));
1023
1024         /*
1025          * We actually need a session lock, so that the lock will persist across
1026          * the commit/restart below.  (We could almost get away with letting the
1027          * lock be released at commit, except that someone could try to move
1028          * relations of the DB back into the old directory while we rmtree() it.)
1029          */
1030         LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1031                                                            AccessExclusiveLock);
1032
1033         /*
1034          * Permission checks
1035          */
1036         if (!pg_database_ownercheck(db_id, GetUserId()))
1037                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1038                                            dbname);
1039
1040         /*
1041          * Obviously can't move the tables of my own database
1042          */
1043         if (db_id == MyDatabaseId)
1044                 ereport(ERROR,
1045                                 (errcode(ERRCODE_OBJECT_IN_USE),
1046                                  errmsg("cannot change the tablespace of the currently open database")));
1047
1048         /*
1049          * Get tablespace's oid
1050          */
1051         dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1052
1053         /*
1054          * Permission checks
1055          */
1056         aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1057                                                                            ACL_CREATE);
1058         if (aclresult != ACLCHECK_OK)
1059                 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
1060                                            tblspcname);
1061
1062         /*
1063          * pg_global must never be the default tablespace
1064          */
1065         if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1066                 ereport(ERROR,
1067                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1068                                  errmsg("pg_global cannot be used as default tablespace")));
1069
1070         /*
1071          * No-op if same tablespace
1072          */
1073         if (src_tblspcoid == dst_tblspcoid)
1074         {
1075                 heap_close(pgdbrel, NoLock);
1076                 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1077                                                                          AccessExclusiveLock);
1078                 return;
1079         }
1080
1081         /*
1082          * Check for other backends in the target database.  (Because we hold the
1083          * database lock, no new ones can start after this.)
1084          *
1085          * As in CREATE DATABASE, check this after other error conditions.
1086          */
1087         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1088                 ereport(ERROR,
1089                                 (errcode(ERRCODE_OBJECT_IN_USE),
1090                                  errmsg("database \"%s\" is being accessed by other users",
1091                                                 dbname),
1092                                  errdetail_busy_db(notherbackends, npreparedxacts)));
1093
1094         /*
1095          * Get old and new database paths
1096          */
1097         src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1098         dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1099
1100         /*
1101          * Force a checkpoint before proceeding. This will force dirty buffers out
1102          * to disk, to ensure source database is up-to-date on disk for the copy.
1103          * FlushDatabaseBuffers() would suffice for that, but we also want to
1104          * process any pending unlink requests. Otherwise, the check for existing
1105          * files in the target directory might fail unnecessarily, not to mention
1106          * that the copy might fail due to source files getting deleted under it.
1107          * On Windows, this also ensures that background procs don't hold any open
1108          * files, which would cause rmdir() to fail.
1109          */
1110         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1111
1112         /*
1113          * Check for existence of files in the target directory, i.e., objects of
1114          * this database that are already in the target tablespace.  We can't
1115          * allow the move in such a case, because we would need to change those
1116          * relations' pg_class.reltablespace entries to zero, and we don't have
1117          * access to the DB's pg_class to do so.
1118          */
1119         dstdir = AllocateDir(dst_dbpath);
1120         if (dstdir != NULL)
1121         {
1122                 while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1123                 {
1124                         if (strcmp(xlde->d_name, ".") == 0 ||
1125                                 strcmp(xlde->d_name, "..") == 0)
1126                                 continue;
1127
1128                         ereport(ERROR,
1129                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1130                                          errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1131                                                         dbname, tblspcname),
1132                                          errhint("You must move them back to the database's default tablespace before using this command.")));
1133                 }
1134
1135                 FreeDir(dstdir);
1136
1137                 /*
1138                  * The directory exists but is empty. We must remove it before using
1139                  * the copydir function.
1140                  */
1141                 if (rmdir(dst_dbpath) != 0)
1142                         elog(ERROR, "could not remove directory \"%s\": %m",
1143                                  dst_dbpath);
1144         }
1145
1146         /*
1147          * Use an ENSURE block to make sure we remove the debris if the copy fails
1148          * (eg, due to out-of-disk-space).      This is not a 100% solution, because
1149          * of the possibility of failure during transaction commit, but it should
1150          * handle most scenarios.
1151          */
1152         fparms.dest_dboid = db_id;
1153         fparms.dest_tsoid = dst_tblspcoid;
1154         PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1155                                                         PointerGetDatum(&fparms));
1156         {
1157                 /*
1158                  * Copy files from the old tablespace to the new one
1159                  */
1160                 copydir(src_dbpath, dst_dbpath, false);
1161
1162                 /*
1163                  * Record the filesystem change in XLOG
1164                  */
1165                 {
1166                         xl_dbase_create_rec xlrec;
1167                         XLogRecData rdata[1];
1168
1169                         xlrec.db_id = db_id;
1170                         xlrec.tablespace_id = dst_tblspcoid;
1171                         xlrec.src_db_id = db_id;
1172                         xlrec.src_tablespace_id = src_tblspcoid;
1173
1174                         rdata[0].data = (char *) &xlrec;
1175                         rdata[0].len = sizeof(xl_dbase_create_rec);
1176                         rdata[0].buffer = InvalidBuffer;
1177                         rdata[0].next = NULL;
1178
1179                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
1180                 }
1181
1182                 /*
1183                  * Update the database's pg_database tuple
1184                  */
1185                 ScanKeyInit(&scankey,
1186                                         Anum_pg_database_datname,
1187                                         BTEqualStrategyNumber, F_NAMEEQ,
1188                                         NameGetDatum(dbname));
1189                 sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1190                                                                          NULL, 1, &scankey);
1191                 oldtuple = systable_getnext(sysscan);
1192                 if (!HeapTupleIsValid(oldtuple))                /* shouldn't happen... */
1193                         ereport(ERROR,
1194                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
1195                                          errmsg("database \"%s\" does not exist", dbname)));
1196
1197                 MemSet(new_record, 0, sizeof(new_record));
1198                 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1199                 MemSet(new_record_repl, false, sizeof(new_record_repl));
1200
1201                 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1202                 new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1203
1204                 newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1205                                                                          new_record,
1206                                                                          new_record_nulls, new_record_repl);
1207                 simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple);
1208
1209                 /* Update indexes */
1210                 CatalogUpdateIndexes(pgdbrel, newtuple);
1211
1212                 InvokeObjectPostAlterHook(DatabaseRelationId,
1213                                                                   HeapTupleGetOid(newtuple), 0);
1214
1215                 systable_endscan(sysscan);
1216
1217                 /*
1218                  * Force another checkpoint here.  As in CREATE DATABASE, this is to
1219                  * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1220                  * operation, which would cause us to lose any unlogged operations
1221                  * done in the new DB tablespace before the next checkpoint.
1222                  */
1223                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1224
1225                 /*
1226                  * Force synchronous commit, thus minimizing the window between
1227                  * copying the database files and commital of the transaction. If we
1228                  * crash before committing, we'll leave an orphaned set of files on
1229                  * disk, which is not fatal but not good either.
1230                  */
1231                 ForceSyncCommit();
1232
1233                 /*
1234                  * Close pg_database, but keep lock till commit.
1235                  */
1236                 heap_close(pgdbrel, NoLock);
1237         }
1238         PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1239                                                                 PointerGetDatum(&fparms));
1240
1241         /*
1242          * Commit the transaction so that the pg_database update is committed. If
1243          * we crash while removing files, the database won't be corrupt, we'll
1244          * just leave some orphaned files in the old directory.
1245          *
1246          * (This is OK because we know we aren't inside a transaction block.)
1247          *
1248          * XXX would it be safe/better to do this inside the ensure block?      Not
1249          * convinced it's a good idea; consider elog just after the transaction
1250          * really commits.
1251          */
1252         PopActiveSnapshot();
1253         CommitTransactionCommand();
1254
1255         /* Start new transaction for the remaining work; don't need a snapshot */
1256         StartTransactionCommand();
1257
1258         /*
1259          * Remove files from the old tablespace
1260          */
1261         if (!rmtree(src_dbpath, true))
1262                 ereport(WARNING,
1263                                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1264                                                 src_dbpath)));
1265
1266         /*
1267          * Record the filesystem change in XLOG
1268          */
1269         {
1270                 xl_dbase_drop_rec xlrec;
1271                 XLogRecData rdata[1];
1272
1273                 xlrec.db_id = db_id;
1274                 xlrec.tablespace_id = src_tblspcoid;
1275
1276                 rdata[0].data = (char *) &xlrec;
1277                 rdata[0].len = sizeof(xl_dbase_drop_rec);
1278                 rdata[0].buffer = InvalidBuffer;
1279                 rdata[0].next = NULL;
1280
1281                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1282         }
1283
1284         /* Now it's safe to release the database lock */
1285         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1286                                                                  AccessExclusiveLock);
1287 }
1288
1289 /* Error cleanup callback for movedb */
1290 static void
1291 movedb_failure_callback(int code, Datum arg)
1292 {
1293         movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1294         char       *dstpath;
1295
1296         /* Get rid of anything we managed to copy to the target directory */
1297         dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1298
1299         (void) rmtree(dstpath, true);
1300 }
1301
1302
1303 /*
1304  * ALTER DATABASE name ...
1305  */
1306 Oid
1307 AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
1308 {
1309         Relation        rel;
1310         Oid                     dboid;
1311         HeapTuple       tuple,
1312                                 newtuple;
1313         ScanKeyData scankey;
1314         SysScanDesc scan;
1315         ListCell   *option;
1316         int                     connlimit = -1;
1317         DefElem    *dconnlimit = NULL;
1318         DefElem    *dtablespace = NULL;
1319         Datum           new_record[Natts_pg_database];
1320         bool            new_record_nulls[Natts_pg_database];
1321         bool            new_record_repl[Natts_pg_database];
1322
1323         /* Extract options from the statement node tree */
1324         foreach(option, stmt->options)
1325         {
1326                 DefElem    *defel = (DefElem *) lfirst(option);
1327
1328                 if (strcmp(defel->defname, "connectionlimit") == 0)
1329                 {
1330                         if (dconnlimit)
1331                                 ereport(ERROR,
1332                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1333                                                  errmsg("conflicting or redundant options")));
1334                         dconnlimit = defel;
1335                 }
1336                 else if (strcmp(defel->defname, "tablespace") == 0)
1337                 {
1338                         if (dtablespace)
1339                                 ereport(ERROR,
1340                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1341                                                  errmsg("conflicting or redundant options")));
1342                         dtablespace = defel;
1343                 }
1344                 else
1345                         elog(ERROR, "option \"%s\" not recognized",
1346                                  defel->defname);
1347         }
1348
1349         if (dtablespace)
1350         {
1351                 /* currently, can't be specified along with any other options */
1352                 Assert(!dconnlimit);
1353                 /* this case isn't allowed within a transaction block */
1354                 PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1355                 movedb(stmt->dbname, strVal(dtablespace->arg));
1356                 return InvalidOid;
1357         }
1358
1359         if (dconnlimit)
1360         {
1361                 connlimit = intVal(dconnlimit->arg);
1362                 if (connlimit < -1)
1363                         ereport(ERROR,
1364                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1365                                          errmsg("invalid connection limit: %d", connlimit)));
1366         }
1367
1368         /*
1369          * Get the old tuple.  We don't need a lock on the database per se,
1370          * because we're not going to do anything that would mess up incoming
1371          * connections.
1372          */
1373         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1374         ScanKeyInit(&scankey,
1375                                 Anum_pg_database_datname,
1376                                 BTEqualStrategyNumber, F_NAMEEQ,
1377                                 NameGetDatum(stmt->dbname));
1378         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1379                                                           NULL, 1, &scankey);
1380         tuple = systable_getnext(scan);
1381         if (!HeapTupleIsValid(tuple))
1382                 ereport(ERROR,
1383                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1384                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
1385
1386         dboid = HeapTupleGetOid(tuple);
1387
1388         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1389                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1390                                            stmt->dbname);
1391
1392         /*
1393          * Build an updated tuple, perusing the information just obtained
1394          */
1395         MemSet(new_record, 0, sizeof(new_record));
1396         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1397         MemSet(new_record_repl, false, sizeof(new_record_repl));
1398
1399         if (dconnlimit)
1400         {
1401                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
1402                 new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1403         }
1404
1405         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1406                                                                  new_record_nulls, new_record_repl);
1407         simple_heap_update(rel, &tuple->t_self, newtuple);
1408
1409         /* Update indexes */
1410         CatalogUpdateIndexes(rel, newtuple);
1411
1412         InvokeObjectPostAlterHook(DatabaseRelationId,
1413                                                           HeapTupleGetOid(newtuple), 0);
1414
1415         systable_endscan(scan);
1416
1417         /* Close pg_database, but keep lock till commit */
1418         heap_close(rel, NoLock);
1419
1420         return dboid;
1421 }
1422
1423
1424 /*
1425  * ALTER DATABASE name SET ...
1426  */
1427 Oid
1428 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1429 {
1430         Oid                     datid = get_database_oid(stmt->dbname, false);
1431
1432         /*
1433          * Obtain a lock on the database and make sure it didn't go away in the
1434          * meantime.
1435          */
1436         shdepLockAndCheckObject(DatabaseRelationId, datid);
1437
1438         if (!pg_database_ownercheck(datid, GetUserId()))
1439                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1440                                            stmt->dbname);
1441
1442         AlterSetting(datid, InvalidOid, stmt->setstmt);
1443
1444         UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1445
1446         return datid;
1447 }
1448
1449
1450 /*
1451  * ALTER DATABASE name OWNER TO newowner
1452  */
1453 Oid
1454 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1455 {
1456         Oid                     db_id;
1457         HeapTuple       tuple;
1458         Relation        rel;
1459         ScanKeyData scankey;
1460         SysScanDesc scan;
1461         Form_pg_database datForm;
1462
1463         /*
1464          * Get the old tuple.  We don't need a lock on the database per se,
1465          * because we're not going to do anything that would mess up incoming
1466          * connections.
1467          */
1468         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1469         ScanKeyInit(&scankey,
1470                                 Anum_pg_database_datname,
1471                                 BTEqualStrategyNumber, F_NAMEEQ,
1472                                 NameGetDatum(dbname));
1473         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1474                                                           NULL, 1, &scankey);
1475         tuple = systable_getnext(scan);
1476         if (!HeapTupleIsValid(tuple))
1477                 ereport(ERROR,
1478                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1479                                  errmsg("database \"%s\" does not exist", dbname)));
1480
1481         db_id = HeapTupleGetOid(tuple);
1482         datForm = (Form_pg_database) GETSTRUCT(tuple);
1483
1484         /*
1485          * If the new owner is the same as the existing owner, consider the
1486          * command to have succeeded.  This is to be consistent with other
1487          * objects.
1488          */
1489         if (datForm->datdba != newOwnerId)
1490         {
1491                 Datum           repl_val[Natts_pg_database];
1492                 bool            repl_null[Natts_pg_database];
1493                 bool            repl_repl[Natts_pg_database];
1494                 Acl                *newAcl;
1495                 Datum           aclDatum;
1496                 bool            isNull;
1497                 HeapTuple       newtuple;
1498
1499                 /* Otherwise, must be owner of the existing object */
1500                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1501                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1502                                                    dbname);
1503
1504                 /* Must be able to become new owner */
1505                 check_is_member_of_role(GetUserId(), newOwnerId);
1506
1507                 /*
1508                  * must have createdb rights
1509                  *
1510                  * NOTE: This is different from other alter-owner checks in that the
1511                  * current user is checked for createdb privileges instead of the
1512                  * destination owner.  This is consistent with the CREATE case for
1513                  * databases.  Because superusers will always have this right, we need
1514                  * no special case for them.
1515                  */
1516                 if (!have_createdb_privilege())
1517                         ereport(ERROR,
1518                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1519                                    errmsg("permission denied to change owner of database")));
1520
1521                 memset(repl_null, false, sizeof(repl_null));
1522                 memset(repl_repl, false, sizeof(repl_repl));
1523
1524                 repl_repl[Anum_pg_database_datdba - 1] = true;
1525                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1526
1527                 /*
1528                  * Determine the modified ACL for the new owner.  This is only
1529                  * necessary when the ACL is non-null.
1530                  */
1531                 aclDatum = heap_getattr(tuple,
1532                                                                 Anum_pg_database_datacl,
1533                                                                 RelationGetDescr(rel),
1534                                                                 &isNull);
1535                 if (!isNull)
1536                 {
1537                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1538                                                                  datForm->datdba, newOwnerId);
1539                         repl_repl[Anum_pg_database_datacl - 1] = true;
1540                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1541                 }
1542
1543                 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1544                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1545                 CatalogUpdateIndexes(rel, newtuple);
1546
1547                 heap_freetuple(newtuple);
1548
1549                 /* Update owner dependency reference */
1550                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1551                                                                 newOwnerId);
1552         }
1553
1554         InvokeObjectPostAlterHook(DatabaseRelationId, HeapTupleGetOid(tuple), 0);
1555
1556         systable_endscan(scan);
1557
1558         /* Close pg_database, but keep lock till commit */
1559         heap_close(rel, NoLock);
1560
1561         return db_id;
1562 }
1563
1564
1565 /*
1566  * Helper functions
1567  */
1568
1569 /*
1570  * Look up info about the database named "name".  If the database exists,
1571  * obtain the specified lock type on it, fill in any of the remaining
1572  * parameters that aren't NULL, and return TRUE.  If no such database,
1573  * return FALSE.
1574  */
1575 static bool
1576 get_db_info(const char *name, LOCKMODE lockmode,
1577                         Oid *dbIdP, Oid *ownerIdP,
1578                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1579                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1580                         MultiXactId *dbMinMultiP,
1581                         Oid *dbTablespace, char **dbCollate, char **dbCtype)
1582 {
1583         bool            result = false;
1584         Relation        relation;
1585
1586         AssertArg(name);
1587
1588         /* Caller may wish to grab a better lock on pg_database beforehand... */
1589         relation = heap_open(DatabaseRelationId, AccessShareLock);
1590
1591         /*
1592          * Loop covers the rare case where the database is renamed before we can
1593          * lock it.  We try again just in case we can find a new one of the same
1594          * name.
1595          */
1596         for (;;)
1597         {
1598                 ScanKeyData scanKey;
1599                 SysScanDesc scan;
1600                 HeapTuple       tuple;
1601                 Oid                     dbOid;
1602
1603                 /*
1604                  * there's no syscache for database-indexed-by-name, so must do it the
1605                  * hard way
1606                  */
1607                 ScanKeyInit(&scanKey,
1608                                         Anum_pg_database_datname,
1609                                         BTEqualStrategyNumber, F_NAMEEQ,
1610                                         NameGetDatum(name));
1611
1612                 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1613                                                                   NULL, 1, &scanKey);
1614
1615                 tuple = systable_getnext(scan);
1616
1617                 if (!HeapTupleIsValid(tuple))
1618                 {
1619                         /* definitely no database of that name */
1620                         systable_endscan(scan);
1621                         break;
1622                 }
1623
1624                 dbOid = HeapTupleGetOid(tuple);
1625
1626                 systable_endscan(scan);
1627
1628                 /*
1629                  * Now that we have a database OID, we can try to lock the DB.
1630                  */
1631                 if (lockmode != NoLock)
1632                         LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1633
1634                 /*
1635                  * And now, re-fetch the tuple by OID.  If it's still there and still
1636                  * the same name, we win; else, drop the lock and loop back to try
1637                  * again.
1638                  */
1639                 tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1640                 if (HeapTupleIsValid(tuple))
1641                 {
1642                         Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1643
1644                         if (strcmp(name, NameStr(dbform->datname)) == 0)
1645                         {
1646                                 /* oid of the database */
1647                                 if (dbIdP)
1648                                         *dbIdP = dbOid;
1649                                 /* oid of the owner */
1650                                 if (ownerIdP)
1651                                         *ownerIdP = dbform->datdba;
1652                                 /* character encoding */
1653                                 if (encodingP)
1654                                         *encodingP = dbform->encoding;
1655                                 /* allowed as template? */
1656                                 if (dbIsTemplateP)
1657                                         *dbIsTemplateP = dbform->datistemplate;
1658                                 /* allowing connections? */
1659                                 if (dbAllowConnP)
1660                                         *dbAllowConnP = dbform->datallowconn;
1661                                 /* last system OID used in database */
1662                                 if (dbLastSysOidP)
1663                                         *dbLastSysOidP = dbform->datlastsysoid;
1664                                 /* limit of frozen XIDs */
1665                                 if (dbFrozenXidP)
1666                                         *dbFrozenXidP = dbform->datfrozenxid;
1667                                 /* minimum MultixactId */
1668                                 if (dbMinMultiP)
1669                                         *dbMinMultiP = dbform->datminmxid;
1670                                 /* default tablespace for this database */
1671                                 if (dbTablespace)
1672                                         *dbTablespace = dbform->dattablespace;
1673                                 /* default locale settings for this database */
1674                                 if (dbCollate)
1675                                         *dbCollate = pstrdup(NameStr(dbform->datcollate));
1676                                 if (dbCtype)
1677                                         *dbCtype = pstrdup(NameStr(dbform->datctype));
1678                                 ReleaseSysCache(tuple);
1679                                 result = true;
1680                                 break;
1681                         }
1682                         /* can only get here if it was just renamed */
1683                         ReleaseSysCache(tuple);
1684                 }
1685
1686                 if (lockmode != NoLock)
1687                         UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1688         }
1689
1690         heap_close(relation, AccessShareLock);
1691
1692         return result;
1693 }
1694
1695 /* Check if current user has createdb privileges */
1696 static bool
1697 have_createdb_privilege(void)
1698 {
1699         bool            result = false;
1700         HeapTuple       utup;
1701
1702         /* Superusers can always do everything */
1703         if (superuser())
1704                 return true;
1705
1706         utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
1707         if (HeapTupleIsValid(utup))
1708         {
1709                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1710                 ReleaseSysCache(utup);
1711         }
1712         return result;
1713 }
1714
1715 /*
1716  * Remove tablespace directories
1717  *
1718  * We don't know what tablespaces db_id is using, so iterate through all
1719  * tablespaces removing <tablespace>/db_id
1720  */
1721 static void
1722 remove_dbtablespaces(Oid db_id)
1723 {
1724         Relation        rel;
1725         HeapScanDesc scan;
1726         HeapTuple       tuple;
1727
1728         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1729         scan = heap_beginscan_catalog(rel, 0, NULL);
1730         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1731         {
1732                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1733                 char       *dstpath;
1734                 struct stat st;
1735
1736                 /* Don't mess with the global tablespace */
1737                 if (dsttablespace == GLOBALTABLESPACE_OID)
1738                         continue;
1739
1740                 dstpath = GetDatabasePath(db_id, dsttablespace);
1741
1742                 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1743                 {
1744                         /* Assume we can ignore it */
1745                         pfree(dstpath);
1746                         continue;
1747                 }
1748
1749                 if (!rmtree(dstpath, true))
1750                         ereport(WARNING,
1751                                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
1752                                                         dstpath)));
1753
1754                 /* Record the filesystem change in XLOG */
1755                 {
1756                         xl_dbase_drop_rec xlrec;
1757                         XLogRecData rdata[1];
1758
1759                         xlrec.db_id = db_id;
1760                         xlrec.tablespace_id = dsttablespace;
1761
1762                         rdata[0].data = (char *) &xlrec;
1763                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1764                         rdata[0].buffer = InvalidBuffer;
1765                         rdata[0].next = NULL;
1766
1767                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1768                 }
1769
1770                 pfree(dstpath);
1771         }
1772
1773         heap_endscan(scan);
1774         heap_close(rel, AccessShareLock);
1775 }
1776
1777 /*
1778  * Check for existing files that conflict with a proposed new DB OID;
1779  * return TRUE if there are any
1780  *
1781  * If there were a subdirectory in any tablespace matching the proposed new
1782  * OID, we'd get a create failure due to the duplicate name ... and then we'd
1783  * try to remove that already-existing subdirectory during the cleanup in
1784  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
1785  * instead we make this extra check before settling on the OID of the new
1786  * database.  This exactly parallels what GetNewRelFileNode() does for table
1787  * relfilenode values.
1788  */
1789 static bool
1790 check_db_file_conflict(Oid db_id)
1791 {
1792         bool            result = false;
1793         Relation        rel;
1794         HeapScanDesc scan;
1795         HeapTuple       tuple;
1796
1797         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1798         scan = heap_beginscan_catalog(rel, 0, NULL);
1799         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1800         {
1801                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1802                 char       *dstpath;
1803                 struct stat st;
1804
1805                 /* Don't mess with the global tablespace */
1806                 if (dsttablespace == GLOBALTABLESPACE_OID)
1807                         continue;
1808
1809                 dstpath = GetDatabasePath(db_id, dsttablespace);
1810
1811                 if (lstat(dstpath, &st) == 0)
1812                 {
1813                         /* Found a conflicting file (or directory, whatever) */
1814                         pfree(dstpath);
1815                         result = true;
1816                         break;
1817                 }
1818
1819                 pfree(dstpath);
1820         }
1821
1822         heap_endscan(scan);
1823         heap_close(rel, AccessShareLock);
1824
1825         return result;
1826 }
1827
1828 /*
1829  * Issue a suitable errdetail message for a busy database
1830  */
1831 static int
1832 errdetail_busy_db(int notherbackends, int npreparedxacts)
1833 {
1834         if (notherbackends > 0 && npreparedxacts > 0)
1835
1836                 /*
1837                  * We don't deal with singular versus plural here, since gettext
1838                  * doesn't support multiple plurals in one string.
1839                  */
1840                 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1841                                   notherbackends, npreparedxacts);
1842         else if (notherbackends > 0)
1843                 errdetail_plural("There is %d other session using the database.",
1844                                                  "There are %d other sessions using the database.",
1845                                                  notherbackends,
1846                                                  notherbackends);
1847         else
1848                 errdetail_plural("There is %d prepared transaction using the database.",
1849                                         "There are %d prepared transactions using the database.",
1850                                                  npreparedxacts,
1851                                                  npreparedxacts);
1852         return 0;                                       /* just to keep ereport macro happy */
1853 }
1854
1855 /*
1856  * get_database_oid - given a database name, look up the OID
1857  *
1858  * If missing_ok is false, throw an error if database name not found.  If
1859  * true, just return InvalidOid.
1860  */
1861 Oid
1862 get_database_oid(const char *dbname, bool missing_ok)
1863 {
1864         Relation        pg_database;
1865         ScanKeyData entry[1];
1866         SysScanDesc scan;
1867         HeapTuple       dbtuple;
1868         Oid                     oid;
1869
1870         /*
1871          * There's no syscache for pg_database indexed by name, so we must look
1872          * the hard way.
1873          */
1874         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1875         ScanKeyInit(&entry[0],
1876                                 Anum_pg_database_datname,
1877                                 BTEqualStrategyNumber, F_NAMEEQ,
1878                                 CStringGetDatum(dbname));
1879         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1880                                                           NULL, 1, entry);
1881
1882         dbtuple = systable_getnext(scan);
1883
1884         /* We assume that there can be at most one matching tuple */
1885         if (HeapTupleIsValid(dbtuple))
1886                 oid = HeapTupleGetOid(dbtuple);
1887         else
1888                 oid = InvalidOid;
1889
1890         systable_endscan(scan);
1891         heap_close(pg_database, AccessShareLock);
1892
1893         if (!OidIsValid(oid) && !missing_ok)
1894                 ereport(ERROR,
1895                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1896                                  errmsg("database \"%s\" does not exist",
1897                                                 dbname)));
1898
1899         return oid;
1900 }
1901
1902
1903 /*
1904  * get_database_name - given a database OID, look up the name
1905  *
1906  * Returns a palloc'd string, or NULL if no such database.
1907  */
1908 char *
1909 get_database_name(Oid dbid)
1910 {
1911         HeapTuple       dbtuple;
1912         char       *result;
1913
1914         dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
1915         if (HeapTupleIsValid(dbtuple))
1916         {
1917                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1918                 ReleaseSysCache(dbtuple);
1919         }
1920         else
1921                 result = NULL;
1922
1923         return result;
1924 }
1925
1926 /*
1927  * DATABASE resource manager's routines
1928  */
1929 void
1930 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1931 {
1932         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1933
1934         /* Backup blocks are not used in dbase records */
1935         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
1936
1937         if (info == XLOG_DBASE_CREATE)
1938         {
1939                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1940                 char       *src_path;
1941                 char       *dst_path;
1942                 struct stat st;
1943
1944                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1945                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1946
1947                 /*
1948                  * Our theory for replaying a CREATE is to forcibly drop the target
1949                  * subdirectory if present, then re-copy the source data. This may be
1950                  * more work than needed, but it is simple to implement.
1951                  */
1952                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1953                 {
1954                         if (!rmtree(dst_path, true))
1955                                 /* If this failed, copydir() below is going to error. */
1956                                 ereport(WARNING,
1957                                                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1958                                                                 dst_path)));
1959                 }
1960
1961                 /*
1962                  * Force dirty buffers out to disk, to ensure source database is
1963                  * up-to-date for the copy.
1964                  */
1965                 FlushDatabaseBuffers(xlrec->src_db_id);
1966
1967                 /*
1968                  * Copy this subdirectory to the new location
1969                  *
1970                  * We don't need to copy subdirectories
1971                  */
1972                 copydir(src_path, dst_path, false);
1973         }
1974         else if (info == XLOG_DBASE_DROP)
1975         {
1976                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1977                 char       *dst_path;
1978
1979                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1980
1981                 if (InHotStandby)
1982                 {
1983                         /*
1984                          * Lock database while we resolve conflicts to ensure that
1985                          * InitPostgres() cannot fully re-execute concurrently. This
1986                          * avoids backends re-connecting automatically to same database,
1987                          * which can happen in some cases.
1988                          */
1989                         LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
1990                         ResolveRecoveryConflictWithDatabase(xlrec->db_id);
1991                 }
1992
1993                 /* Drop pages for this database that are in the shared buffer cache */
1994                 DropDatabaseBuffers(xlrec->db_id);
1995
1996                 /* Also, clean out any fsync requests that might be pending in md.c */
1997                 ForgetDatabaseFsyncRequests(xlrec->db_id);
1998
1999                 /* Clean out the xlog relcache too */
2000                 XLogDropDatabase(xlrec->db_id);
2001
2002                 /* And remove the physical files */
2003                 if (!rmtree(dst_path, true))
2004                         ereport(WARNING,
2005                                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
2006                                                         dst_path)));
2007
2008                 if (InHotStandby)
2009                 {
2010                         /*
2011                          * Release locks prior to commit. XXX There is a race condition
2012                          * here that may allow backends to reconnect, but the window for
2013                          * this is small because the gap between here and commit is mostly
2014                          * fairly small and it is unlikely that people will be dropping
2015                          * databases that we are trying to connect to anyway.
2016                          */
2017                         UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2018                 }
2019         }
2020         else
2021                 elog(PANIC, "dbase_redo: unknown op code %u", info);
2022 }