]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Be pickier about converting between Name and Datum.
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes.  Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        src/backend/commands/dbcommands.c
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <locale.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
26
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/htup_details.h"
30 #include "access/xact.h"
31 #include "access/xloginsert.h"
32 #include "access/xlogutils.h"
33 #include "catalog/catalog.h"
34 #include "catalog/dependency.h"
35 #include "catalog/indexing.h"
36 #include "catalog/objectaccess.h"
37 #include "catalog/pg_authid.h"
38 #include "catalog/pg_database.h"
39 #include "catalog/pg_db_role_setting.h"
40 #include "catalog/pg_tablespace.h"
41 #include "commands/comment.h"
42 #include "commands/dbcommands.h"
43 #include "commands/dbcommands_xlog.h"
44 #include "commands/defrem.h"
45 #include "commands/seclabel.h"
46 #include "commands/tablespace.h"
47 #include "mb/pg_wchar.h"
48 #include "miscadmin.h"
49 #include "pgstat.h"
50 #include "postmaster/bgwriter.h"
51 #include "replication/slot.h"
52 #include "storage/copydir.h"
53 #include "storage/fd.h"
54 #include "storage/lmgr.h"
55 #include "storage/ipc.h"
56 #include "storage/procarray.h"
57 #include "storage/smgr.h"
58 #include "utils/acl.h"
59 #include "utils/builtins.h"
60 #include "utils/fmgroids.h"
61 #include "utils/pg_locale.h"
62 #include "utils/snapmgr.h"
63 #include "utils/syscache.h"
64 #include "utils/tqual.h"
65
66
67 typedef struct
68 {
69         Oid                     src_dboid;              /* source (template) DB */
70         Oid                     dest_dboid;             /* DB we are trying to create */
71 } createdb_failure_params;
72
73 typedef struct
74 {
75         Oid                     dest_dboid;             /* DB we are trying to move */
76         Oid                     dest_tsoid;             /* tablespace we are trying to move to */
77 } movedb_failure_params;
78
79 /* non-export function prototypes */
80 static void createdb_failure_callback(int code, Datum arg);
81 static void movedb(const char *dbname, const char *tblspcname);
82 static void movedb_failure_callback(int code, Datum arg);
83 static bool get_db_info(const char *name, LOCKMODE lockmode,
84                         Oid *dbIdP, Oid *ownerIdP,
85                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
86                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
87                         MultiXactId *dbMinMultiP,
88                         Oid *dbTablespace, char **dbCollate, char **dbCtype);
89 static bool have_createdb_privilege(void);
90 static void remove_dbtablespaces(Oid db_id);
91 static bool check_db_file_conflict(Oid db_id);
92 static int      errdetail_busy_db(int notherbackends, int npreparedxacts);
93
94
95 /*
96  * CREATE DATABASE
97  */
98 Oid
99 createdb(ParseState *pstate, const CreatedbStmt *stmt)
100 {
101         HeapScanDesc scan;
102         Relation        rel;
103         Oid                     src_dboid;
104         Oid                     src_owner;
105         int                     src_encoding;
106         char       *src_collate;
107         char       *src_ctype;
108         bool            src_istemplate;
109         bool            src_allowconn;
110         Oid                     src_lastsysoid;
111         TransactionId src_frozenxid;
112         MultiXactId src_minmxid;
113         Oid                     src_deftablespace;
114         volatile Oid dst_deftablespace;
115         Relation        pg_database_rel;
116         HeapTuple       tuple;
117         Datum           new_record[Natts_pg_database];
118         bool            new_record_nulls[Natts_pg_database];
119         Oid                     dboid;
120         Oid                     datdba;
121         ListCell   *option;
122         DefElem    *dtablespacename = NULL;
123         DefElem    *downer = NULL;
124         DefElem    *dtemplate = NULL;
125         DefElem    *dencoding = NULL;
126         DefElem    *dcollate = NULL;
127         DefElem    *dctype = NULL;
128         DefElem    *distemplate = NULL;
129         DefElem    *dallowconnections = NULL;
130         DefElem    *dconnlimit = NULL;
131         char       *dbname = stmt->dbname;
132         char       *dbowner = NULL;
133         const char *dbtemplate = NULL;
134         char       *dbcollate = NULL;
135         char       *dbctype = NULL;
136         char       *canonname;
137         int                     encoding = -1;
138         bool            dbistemplate = false;
139         bool            dballowconnections = true;
140         int                     dbconnlimit = -1;
141         int                     notherbackends;
142         int                     npreparedxacts;
143         createdb_failure_params fparms;
144
145         /* Extract options from the statement node tree */
146         foreach(option, stmt->options)
147         {
148                 DefElem    *defel = (DefElem *) lfirst(option);
149
150                 if (strcmp(defel->defname, "tablespace") == 0)
151                 {
152                         if (dtablespacename)
153                                 ereport(ERROR,
154                                                 (errcode(ERRCODE_SYNTAX_ERROR),
155                                                  errmsg("conflicting or redundant options"),
156                                                  parser_errposition(pstate, defel->location)));
157                         dtablespacename = defel;
158                 }
159                 else if (strcmp(defel->defname, "owner") == 0)
160                 {
161                         if (downer)
162                                 ereport(ERROR,
163                                                 (errcode(ERRCODE_SYNTAX_ERROR),
164                                                  errmsg("conflicting or redundant options"),
165                                                  parser_errposition(pstate, defel->location)));
166                         downer = defel;
167                 }
168                 else if (strcmp(defel->defname, "template") == 0)
169                 {
170                         if (dtemplate)
171                                 ereport(ERROR,
172                                                 (errcode(ERRCODE_SYNTAX_ERROR),
173                                                  errmsg("conflicting or redundant options"),
174                                                  parser_errposition(pstate, defel->location)));
175                         dtemplate = defel;
176                 }
177                 else if (strcmp(defel->defname, "encoding") == 0)
178                 {
179                         if (dencoding)
180                                 ereport(ERROR,
181                                                 (errcode(ERRCODE_SYNTAX_ERROR),
182                                                  errmsg("conflicting or redundant options"),
183                                                  parser_errposition(pstate, defel->location)));
184                         dencoding = defel;
185                 }
186                 else if (strcmp(defel->defname, "lc_collate") == 0)
187                 {
188                         if (dcollate)
189                                 ereport(ERROR,
190                                                 (errcode(ERRCODE_SYNTAX_ERROR),
191                                                  errmsg("conflicting or redundant options"),
192                                                  parser_errposition(pstate, defel->location)));
193                         dcollate = defel;
194                 }
195                 else if (strcmp(defel->defname, "lc_ctype") == 0)
196                 {
197                         if (dctype)
198                                 ereport(ERROR,
199                                                 (errcode(ERRCODE_SYNTAX_ERROR),
200                                                  errmsg("conflicting or redundant options"),
201                                                  parser_errposition(pstate, defel->location)));
202                         dctype = defel;
203                 }
204                 else if (strcmp(defel->defname, "is_template") == 0)
205                 {
206                         if (distemplate)
207                                 ereport(ERROR,
208                                                 (errcode(ERRCODE_SYNTAX_ERROR),
209                                                  errmsg("conflicting or redundant options"),
210                                                  parser_errposition(pstate, defel->location)));
211                         distemplate = defel;
212                 }
213                 else if (strcmp(defel->defname, "allow_connections") == 0)
214                 {
215                         if (dallowconnections)
216                                 ereport(ERROR,
217                                                 (errcode(ERRCODE_SYNTAX_ERROR),
218                                                  errmsg("conflicting or redundant options"),
219                                                  parser_errposition(pstate, defel->location)));
220                         dallowconnections = defel;
221                 }
222                 else if (strcmp(defel->defname, "connection_limit") == 0)
223                 {
224                         if (dconnlimit)
225                                 ereport(ERROR,
226                                                 (errcode(ERRCODE_SYNTAX_ERROR),
227                                                  errmsg("conflicting or redundant options"),
228                                                  parser_errposition(pstate, defel->location)));
229                         dconnlimit = defel;
230                 }
231                 else if (strcmp(defel->defname, "location") == 0)
232                 {
233                         ereport(WARNING,
234                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
235                                          errmsg("LOCATION is not supported anymore"),
236                                          errhint("Consider using tablespaces instead."),
237                                          parser_errposition(pstate, defel->location)));
238                 }
239                 else
240                         ereport(ERROR,
241                                         (errcode(ERRCODE_SYNTAX_ERROR),
242                                          errmsg("option \"%s\" not recognized", defel->defname),
243                                          parser_errposition(pstate, defel->location)));
244         }
245
246         if (downer && downer->arg)
247                 dbowner = defGetString(downer);
248         if (dtemplate && dtemplate->arg)
249                 dbtemplate = defGetString(dtemplate);
250         if (dencoding && dencoding->arg)
251         {
252                 const char *encoding_name;
253
254                 if (IsA(dencoding->arg, Integer))
255                 {
256                         encoding = defGetInt32(dencoding);
257                         encoding_name = pg_encoding_to_char(encoding);
258                         if (strcmp(encoding_name, "") == 0 ||
259                                 pg_valid_server_encoding(encoding_name) < 0)
260                                 ereport(ERROR,
261                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
262                                                  errmsg("%d is not a valid encoding code",
263                                                                 encoding),
264                                                  parser_errposition(pstate, dencoding->location)));
265                 }
266                 else
267                 {
268                         encoding_name = defGetString(dencoding);
269                         encoding = pg_valid_server_encoding(encoding_name);
270                         if (encoding < 0)
271                                 ereport(ERROR,
272                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
273                                                  errmsg("%s is not a valid encoding name",
274                                                                 encoding_name),
275                                                  parser_errposition(pstate, dencoding->location)));
276                 }
277         }
278         if (dcollate && dcollate->arg)
279                 dbcollate = defGetString(dcollate);
280         if (dctype && dctype->arg)
281                 dbctype = defGetString(dctype);
282         if (distemplate && distemplate->arg)
283                 dbistemplate = defGetBoolean(distemplate);
284         if (dallowconnections && dallowconnections->arg)
285                 dballowconnections = defGetBoolean(dallowconnections);
286         if (dconnlimit && dconnlimit->arg)
287         {
288                 dbconnlimit = defGetInt32(dconnlimit);
289                 if (dbconnlimit < -1)
290                         ereport(ERROR,
291                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
292                                          errmsg("invalid connection limit: %d", dbconnlimit)));
293         }
294
295         /* obtain OID of proposed owner */
296         if (dbowner)
297                 datdba = get_role_oid(dbowner, false);
298         else
299                 datdba = GetUserId();
300
301         /*
302          * To create a database, must have createdb privilege and must be able to
303          * become the target role (this does not imply that the target role itself
304          * must have createdb privilege).  The latter provision guards against
305          * "giveaway" attacks.  Note that a superuser will always have both of
306          * these privileges a fortiori.
307          */
308         if (!have_createdb_privilege())
309                 ereport(ERROR,
310                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
311                                  errmsg("permission denied to create database")));
312
313         check_is_member_of_role(GetUserId(), datdba);
314
315         /*
316          * Lookup database (template) to be cloned, and obtain share lock on it.
317          * ShareLock allows two CREATE DATABASEs to work from the same template
318          * concurrently, while ensuring no one is busy dropping it in parallel
319          * (which would be Very Bad since we'd likely get an incomplete copy
320          * without knowing it).  This also prevents any new connections from being
321          * made to the source until we finish copying it, so we can be sure it
322          * won't change underneath us.
323          */
324         if (!dbtemplate)
325                 dbtemplate = "template1";               /* Default template database name */
326
327         if (!get_db_info(dbtemplate, ShareLock,
328                                          &src_dboid, &src_owner, &src_encoding,
329                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
330                                          &src_frozenxid, &src_minmxid, &src_deftablespace,
331                                          &src_collate, &src_ctype))
332                 ereport(ERROR,
333                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
334                                  errmsg("template database \"%s\" does not exist",
335                                                 dbtemplate)));
336
337         /*
338          * Permission check: to copy a DB that's not marked datistemplate, you
339          * must be superuser or the owner thereof.
340          */
341         if (!src_istemplate)
342         {
343                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
344                         ereport(ERROR,
345                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
346                                          errmsg("permission denied to copy database \"%s\"",
347                                                         dbtemplate)));
348         }
349
350         /* If encoding or locales are defaulted, use source's setting */
351         if (encoding < 0)
352                 encoding = src_encoding;
353         if (dbcollate == NULL)
354                 dbcollate = src_collate;
355         if (dbctype == NULL)
356                 dbctype = src_ctype;
357
358         /* Some encodings are client only */
359         if (!PG_VALID_BE_ENCODING(encoding))
360                 ereport(ERROR,
361                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
362                                  errmsg("invalid server encoding %d", encoding)));
363
364         /* Check that the chosen locales are valid, and get canonical spellings */
365         if (!check_locale(LC_COLLATE, dbcollate, &canonname))
366                 ereport(ERROR,
367                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
368                                  errmsg("invalid locale name: \"%s\"", dbcollate)));
369         dbcollate = canonname;
370         if (!check_locale(LC_CTYPE, dbctype, &canonname))
371                 ereport(ERROR,
372                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
373                                  errmsg("invalid locale name: \"%s\"", dbctype)));
374         dbctype = canonname;
375
376         check_encoding_locale_matches(encoding, dbcollate, dbctype);
377
378         /*
379          * Check that the new encoding and locale settings match the source
380          * database.  We insist on this because we simply copy the source data ---
381          * any non-ASCII data would be wrongly encoded, and any indexes sorted
382          * according to the source locale would be wrong.
383          *
384          * However, we assume that template0 doesn't contain any non-ASCII data
385          * nor any indexes that depend on collation or ctype, so template0 can be
386          * used as template for creating a database with any encoding or locale.
387          */
388         if (strcmp(dbtemplate, "template0") != 0)
389         {
390                 if (encoding != src_encoding)
391                         ereport(ERROR,
392                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
393                                          errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
394                                                         pg_encoding_to_char(encoding),
395                                                         pg_encoding_to_char(src_encoding)),
396                                          errhint("Use the same encoding as in the template database, or use template0 as template.")));
397
398                 if (strcmp(dbcollate, src_collate) != 0)
399                         ereport(ERROR,
400                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
401                                          errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
402                                                         dbcollate, src_collate),
403                                          errhint("Use the same collation as in the template database, or use template0 as template.")));
404
405                 if (strcmp(dbctype, src_ctype) != 0)
406                         ereport(ERROR,
407                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
408                                          errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
409                                                         dbctype, src_ctype),
410                                          errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
411         }
412
413         /* Resolve default tablespace for new database */
414         if (dtablespacename && dtablespacename->arg)
415         {
416                 char       *tablespacename;
417                 AclResult       aclresult;
418
419                 tablespacename = defGetString(dtablespacename);
420                 dst_deftablespace = get_tablespace_oid(tablespacename, false);
421                 /* check permissions */
422                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
423                                                                                    ACL_CREATE);
424                 if (aclresult != ACLCHECK_OK)
425                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
426                                                    tablespacename);
427
428                 /* pg_global must never be the default tablespace */
429                 if (dst_deftablespace == GLOBALTABLESPACE_OID)
430                         ereport(ERROR,
431                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
432                                   errmsg("pg_global cannot be used as default tablespace")));
433
434                 /*
435                  * If we are trying to change the default tablespace of the template,
436                  * we require that the template not have any files in the new default
437                  * tablespace.  This is necessary because otherwise the copied
438                  * database would contain pg_class rows that refer to its default
439                  * tablespace both explicitly (by OID) and implicitly (as zero), which
440                  * would cause problems.  For example another CREATE DATABASE using
441                  * the copied database as template, and trying to change its default
442                  * tablespace again, would yield outright incorrect results (it would
443                  * improperly move tables to the new default tablespace that should
444                  * stay in the same tablespace).
445                  */
446                 if (dst_deftablespace != src_deftablespace)
447                 {
448                         char       *srcpath;
449                         struct stat st;
450
451                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
452
453                         if (stat(srcpath, &st) == 0 &&
454                                 S_ISDIR(st.st_mode) &&
455                                 !directory_is_empty(srcpath))
456                                 ereport(ERROR,
457                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458                                                  errmsg("cannot assign new default tablespace \"%s\"",
459                                                                 tablespacename),
460                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
461                                                                    dbtemplate)));
462                         pfree(srcpath);
463                 }
464         }
465         else
466         {
467                 /* Use template database's default tablespace */
468                 dst_deftablespace = src_deftablespace;
469                 /* Note there is no additional permission check in this path */
470         }
471
472         /*
473          * Check for db name conflict.  This is just to give a more friendly error
474          * message than "unique index violation".  There's a race condition but
475          * we're willing to accept the less friendly message in that case.
476          */
477         if (OidIsValid(get_database_oid(dbname, true)))
478                 ereport(ERROR,
479                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
480                                  errmsg("database \"%s\" already exists", dbname)));
481
482         /*
483          * The source DB can't have any active backends, except this one
484          * (exception is to allow CREATE DB while connected to template1).
485          * Otherwise we might copy inconsistent data.
486          *
487          * This should be last among the basic error checks, because it involves
488          * potential waiting; we may as well throw an error first if we're gonna
489          * throw one.
490          */
491         if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
492                 ereport(ERROR,
493                                 (errcode(ERRCODE_OBJECT_IN_USE),
494                         errmsg("source database \"%s\" is being accessed by other users",
495                                    dbtemplate),
496                                  errdetail_busy_db(notherbackends, npreparedxacts)));
497
498         /*
499          * Select an OID for the new database, checking that it doesn't have a
500          * filename conflict with anything already existing in the tablespace
501          * directories.
502          */
503         pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
504
505         do
506         {
507                 dboid = GetNewOid(pg_database_rel);
508         } while (check_db_file_conflict(dboid));
509
510         /*
511          * Insert a new tuple into pg_database.  This establishes our ownership of
512          * the new database name (anyone else trying to insert the same name will
513          * block on the unique index, and fail after we commit).
514          */
515
516         /* Form tuple */
517         MemSet(new_record, 0, sizeof(new_record));
518         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
519
520         new_record[Anum_pg_database_datname - 1] =
521                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
522         new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
523         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
524         new_record[Anum_pg_database_datcollate - 1] =
525                 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
526         new_record[Anum_pg_database_datctype - 1] =
527                 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
528         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
529         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
530         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
531         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
532         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
533         new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
534         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
535
536         /*
537          * We deliberately set datacl to default (NULL), rather than copying it
538          * from the template database.  Copying it would be a bad idea when the
539          * owner is not the same as the template's owner.
540          */
541         new_record_nulls[Anum_pg_database_datacl - 1] = true;
542
543         tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
544                                                         new_record, new_record_nulls);
545
546         HeapTupleSetOid(tuple, dboid);
547
548         simple_heap_insert(pg_database_rel, tuple);
549
550         /* Update indexes */
551         CatalogUpdateIndexes(pg_database_rel, tuple);
552
553         /*
554          * Now generate additional catalog entries associated with the new DB
555          */
556
557         /* Register owner dependency */
558         recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
559
560         /* Create pg_shdepend entries for objects within database */
561         copyTemplateDependencies(src_dboid, dboid);
562
563         /* Post creation hook for new database */
564         InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
565
566         /*
567          * Force a checkpoint before starting the copy. This will force all dirty
568          * buffers, including those of unlogged tables, out to disk, to ensure
569          * source database is up-to-date on disk for the copy.
570          * FlushDatabaseBuffers() would suffice for that, but we also want to
571          * process any pending unlink requests. Otherwise, if a checkpoint
572          * happened while we're copying files, a file might be deleted just when
573          * we're about to copy it, causing the lstat() call in copydir() to fail
574          * with ENOENT.
575          */
576         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
577                                           | CHECKPOINT_FLUSH_ALL);
578
579         /*
580          * Once we start copying subdirectories, we need to be able to clean 'em
581          * up if we fail.  Use an ENSURE block to make sure this happens.  (This
582          * is not a 100% solution, because of the possibility of failure during
583          * transaction commit after we leave this routine, but it should handle
584          * most scenarios.)
585          */
586         fparms.src_dboid = src_dboid;
587         fparms.dest_dboid = dboid;
588         PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
589                                                         PointerGetDatum(&fparms));
590         {
591                 /*
592                  * Iterate through all tablespaces of the template database, and copy
593                  * each one to the new database.
594                  */
595                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
596                 scan = heap_beginscan_catalog(rel, 0, NULL);
597                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
598                 {
599                         Oid                     srctablespace = HeapTupleGetOid(tuple);
600                         Oid                     dsttablespace;
601                         char       *srcpath;
602                         char       *dstpath;
603                         struct stat st;
604
605                         /* No need to copy global tablespace */
606                         if (srctablespace == GLOBALTABLESPACE_OID)
607                                 continue;
608
609                         srcpath = GetDatabasePath(src_dboid, srctablespace);
610
611                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
612                                 directory_is_empty(srcpath))
613                         {
614                                 /* Assume we can ignore it */
615                                 pfree(srcpath);
616                                 continue;
617                         }
618
619                         if (srctablespace == src_deftablespace)
620                                 dsttablespace = dst_deftablespace;
621                         else
622                                 dsttablespace = srctablespace;
623
624                         dstpath = GetDatabasePath(dboid, dsttablespace);
625
626                         /*
627                          * Copy this subdirectory to the new location
628                          *
629                          * We don't need to copy subdirectories
630                          */
631                         copydir(srcpath, dstpath, false);
632
633                         /* Record the filesystem change in XLOG */
634                         {
635                                 xl_dbase_create_rec xlrec;
636
637                                 xlrec.db_id = dboid;
638                                 xlrec.tablespace_id = dsttablespace;
639                                 xlrec.src_db_id = src_dboid;
640                                 xlrec.src_tablespace_id = srctablespace;
641
642                                 XLogBeginInsert();
643                                 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
644
645                                 (void) XLogInsert(RM_DBASE_ID,
646                                                                   XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
647                         }
648                 }
649                 heap_endscan(scan);
650                 heap_close(rel, AccessShareLock);
651
652                 /*
653                  * We force a checkpoint before committing.  This effectively means
654                  * that committed XLOG_DBASE_CREATE operations will never need to be
655                  * replayed (at least not in ordinary crash recovery; we still have to
656                  * make the XLOG entry for the benefit of PITR operations). This
657                  * avoids two nasty scenarios:
658                  *
659                  * #1: When PITR is off, we don't XLOG the contents of newly created
660                  * indexes; therefore the drop-and-recreate-whole-directory behavior
661                  * of DBASE_CREATE replay would lose such indexes.
662                  *
663                  * #2: Since we have to recopy the source database during DBASE_CREATE
664                  * replay, we run the risk of copying changes in it that were
665                  * committed after the original CREATE DATABASE command but before the
666                  * system crash that led to the replay.  This is at least unexpected
667                  * and at worst could lead to inconsistencies, eg duplicate table
668                  * names.
669                  *
670                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
671                  *
672                  * In PITR replay, the first of these isn't an issue, and the second
673                  * is only a risk if the CREATE DATABASE and subsequent template
674                  * database change both occur while a base backup is being taken.
675                  * There doesn't seem to be much we can do about that except document
676                  * it as a limitation.
677                  *
678                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
679                  * we can avoid this.
680                  */
681                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
682
683                 /*
684                  * Close pg_database, but keep lock till commit.
685                  */
686                 heap_close(pg_database_rel, NoLock);
687
688                 /*
689                  * Force synchronous commit, thus minimizing the window between
690                  * creation of the database files and commital of the transaction. If
691                  * we crash before committing, we'll have a DB that's taking up disk
692                  * space but is not in pg_database, which is not good.
693                  */
694                 ForceSyncCommit();
695         }
696         PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
697                                                                 PointerGetDatum(&fparms));
698
699         return dboid;
700 }
701
702 /*
703  * Check whether chosen encoding matches chosen locale settings.  This
704  * restriction is necessary because libc's locale-specific code usually
705  * fails when presented with data in an encoding it's not expecting. We
706  * allow mismatch in four cases:
707  *
708  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
709  * which works with any encoding.
710  *
711  * 2. locale encoding = -1, which means that we couldn't determine the
712  * locale's encoding and have to trust the user to get it right.
713  *
714  * 3. selected encoding is UTF8 and platform is win32. This is because
715  * UTF8 is a pseudo codepage that is supported in all locales since it's
716  * converted to UTF16 before being used.
717  *
718  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
719  * is risky but we have historically allowed it --- notably, the
720  * regression tests require it.
721  *
722  * Note: if you change this policy, fix initdb to match.
723  */
724 void
725 check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
726 {
727         int                     ctype_encoding = pg_get_encoding_from_locale(ctype, true);
728         int                     collate_encoding = pg_get_encoding_from_locale(collate, true);
729
730         if (!(ctype_encoding == encoding ||
731                   ctype_encoding == PG_SQL_ASCII ||
732                   ctype_encoding == -1 ||
733 #ifdef WIN32
734                   encoding == PG_UTF8 ||
735 #endif
736                   (encoding == PG_SQL_ASCII && superuser())))
737                 ereport(ERROR,
738                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
739                                  errmsg("encoding \"%s\" does not match locale \"%s\"",
740                                                 pg_encoding_to_char(encoding),
741                                                 ctype),
742                    errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
743                                          pg_encoding_to_char(ctype_encoding))));
744
745         if (!(collate_encoding == encoding ||
746                   collate_encoding == PG_SQL_ASCII ||
747                   collate_encoding == -1 ||
748 #ifdef WIN32
749                   encoding == PG_UTF8 ||
750 #endif
751                   (encoding == PG_SQL_ASCII && superuser())))
752                 ereport(ERROR,
753                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
754                                  errmsg("encoding \"%s\" does not match locale \"%s\"",
755                                                 pg_encoding_to_char(encoding),
756                                                 collate),
757                  errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
758                                    pg_encoding_to_char(collate_encoding))));
759 }
760
761 /* Error cleanup callback for createdb */
762 static void
763 createdb_failure_callback(int code, Datum arg)
764 {
765         createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
766
767         /*
768          * Release lock on source database before doing recursive remove. This is
769          * not essential but it seems desirable to release the lock as soon as
770          * possible.
771          */
772         UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
773
774         /* Throw away any successfully copied subdirectories */
775         remove_dbtablespaces(fparms->dest_dboid);
776 }
777
778
779 /*
780  * DROP DATABASE
781  */
782 void
783 dropdb(const char *dbname, bool missing_ok)
784 {
785         Oid                     db_id;
786         bool            db_istemplate;
787         Relation        pgdbrel;
788         HeapTuple       tup;
789         int                     notherbackends;
790         int                     npreparedxacts;
791         int                     nslots,
792                                 nslots_active;
793
794         /*
795          * Look up the target database's OID, and get exclusive lock on it. We
796          * need this to ensure that no new backend starts up in the target
797          * database while we are deleting it (see postinit.c), and that no one is
798          * using it as a CREATE DATABASE template or trying to delete it for
799          * themselves.
800          */
801         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
802
803         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
804                                    &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
805         {
806                 if (!missing_ok)
807                 {
808                         ereport(ERROR,
809                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
810                                          errmsg("database \"%s\" does not exist", dbname)));
811                 }
812                 else
813                 {
814                         /* Close pg_database, release the lock, since we changed nothing */
815                         heap_close(pgdbrel, RowExclusiveLock);
816                         ereport(NOTICE,
817                                         (errmsg("database \"%s\" does not exist, skipping",
818                                                         dbname)));
819                         return;
820                 }
821         }
822
823         /*
824          * Permission checks
825          */
826         if (!pg_database_ownercheck(db_id, GetUserId()))
827                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
828                                            dbname);
829
830         /* DROP hook for the database being removed */
831         InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
832
833         /*
834          * Disallow dropping a DB that is marked istemplate.  This is just to
835          * prevent people from accidentally dropping template0 or template1; they
836          * can do so if they're really determined ...
837          */
838         if (db_istemplate)
839                 ereport(ERROR,
840                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
841                                  errmsg("cannot drop a template database")));
842
843         /* Obviously can't drop my own database */
844         if (db_id == MyDatabaseId)
845                 ereport(ERROR,
846                                 (errcode(ERRCODE_OBJECT_IN_USE),
847                                  errmsg("cannot drop the currently open database")));
848
849         /*
850          * Check whether there are, possibly unconnected, logical slots that refer
851          * to the to-be-dropped database. The database lock we are holding
852          * prevents the creation of new slots using the database.
853          */
854         if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
855                 ereport(ERROR,
856                                 (errcode(ERRCODE_OBJECT_IN_USE),
857                           errmsg("database \"%s\" is used by a logical replication slot",
858                                          dbname),
859                                  errdetail_plural("There is %d slot, %d of them active.",
860                                                                   "There are %d slots, %d of them active.",
861                                                                   nslots,
862                                                                   nslots, nslots_active)));
863
864         /*
865          * Check for other backends in the target database.  (Because we hold the
866          * database lock, no new ones can start after this.)
867          *
868          * As in CREATE DATABASE, check this after other error conditions.
869          */
870         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
871                 ereport(ERROR,
872                                 (errcode(ERRCODE_OBJECT_IN_USE),
873                                  errmsg("database \"%s\" is being accessed by other users",
874                                                 dbname),
875                                  errdetail_busy_db(notherbackends, npreparedxacts)));
876
877         /*
878          * Remove the database's tuple from pg_database.
879          */
880         tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
881         if (!HeapTupleIsValid(tup))
882                 elog(ERROR, "cache lookup failed for database %u", db_id);
883
884         simple_heap_delete(pgdbrel, &tup->t_self);
885
886         ReleaseSysCache(tup);
887
888         /*
889          * Delete any comments or security labels associated with the database.
890          */
891         DeleteSharedComments(db_id, DatabaseRelationId);
892         DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
893
894         /*
895          * Remove settings associated with this database
896          */
897         DropSetting(db_id, InvalidOid);
898
899         /*
900          * Remove shared dependency references for the database.
901          */
902         dropDatabaseDependencies(db_id);
903
904         /*
905          * Drop pages for this database that are in the shared buffer cache. This
906          * is important to ensure that no remaining backend tries to write out a
907          * dirty buffer to the dead database later...
908          */
909         DropDatabaseBuffers(db_id);
910
911         /*
912          * Tell the stats collector to forget it immediately, too.
913          */
914         pgstat_drop_database(db_id);
915
916         /*
917          * Tell checkpointer to forget any pending fsync and unlink requests for
918          * files in the database; else the fsyncs will fail at next checkpoint, or
919          * worse, it will delete files that belong to a newly created database
920          * with the same OID.
921          */
922         ForgetDatabaseFsyncRequests(db_id);
923
924         /*
925          * Force a checkpoint to make sure the checkpointer has received the
926          * message sent by ForgetDatabaseFsyncRequests. On Windows, this also
927          * ensures that background procs don't hold any open files, which would
928          * cause rmdir() to fail.
929          */
930         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
931
932         /*
933          * Remove all tablespace subdirs belonging to the database.
934          */
935         remove_dbtablespaces(db_id);
936
937         /*
938          * Close pg_database, but keep lock till commit.
939          */
940         heap_close(pgdbrel, NoLock);
941
942         /*
943          * Force synchronous commit, thus minimizing the window between removal of
944          * the database files and commital of the transaction. If we crash before
945          * committing, we'll have a DB that's gone on disk but still there
946          * according to pg_database, which is not good.
947          */
948         ForceSyncCommit();
949 }
950
951
952 /*
953  * Rename database
954  */
955 ObjectAddress
956 RenameDatabase(const char *oldname, const char *newname)
957 {
958         Oid                     db_id;
959         HeapTuple       newtup;
960         Relation        rel;
961         int                     notherbackends;
962         int                     npreparedxacts;
963         ObjectAddress address;
964
965         /*
966          * Look up the target database's OID, and get exclusive lock on it. We
967          * need this for the same reasons as DROP DATABASE.
968          */
969         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
970
971         if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
972                                          NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
973                 ereport(ERROR,
974                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
975                                  errmsg("database \"%s\" does not exist", oldname)));
976
977         /* must be owner */
978         if (!pg_database_ownercheck(db_id, GetUserId()))
979                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
980                                            oldname);
981
982         /* must have createdb rights */
983         if (!have_createdb_privilege())
984                 ereport(ERROR,
985                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
986                                  errmsg("permission denied to rename database")));
987
988         /*
989          * Make sure the new name doesn't exist.  See notes for same error in
990          * CREATE DATABASE.
991          */
992         if (OidIsValid(get_database_oid(newname, true)))
993                 ereport(ERROR,
994                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
995                                  errmsg("database \"%s\" already exists", newname)));
996
997         /*
998          * XXX Client applications probably store the current database somewhere,
999          * so renaming it could cause confusion.  On the other hand, there may not
1000          * be an actual problem besides a little confusion, so think about this
1001          * and decide.
1002          */
1003         if (db_id == MyDatabaseId)
1004                 ereport(ERROR,
1005                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1006                                  errmsg("current database cannot be renamed")));
1007
1008         /*
1009          * Make sure the database does not have active sessions.  This is the same
1010          * concern as above, but applied to other sessions.
1011          *
1012          * As in CREATE DATABASE, check this after other error conditions.
1013          */
1014         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1015                 ereport(ERROR,
1016                                 (errcode(ERRCODE_OBJECT_IN_USE),
1017                                  errmsg("database \"%s\" is being accessed by other users",
1018                                                 oldname),
1019                                  errdetail_busy_db(notherbackends, npreparedxacts)));
1020
1021         /* rename */
1022         newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1023         if (!HeapTupleIsValid(newtup))
1024                 elog(ERROR, "cache lookup failed for database %u", db_id);
1025         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1026         simple_heap_update(rel, &newtup->t_self, newtup);
1027         CatalogUpdateIndexes(rel, newtup);
1028
1029         InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1030
1031         ObjectAddressSet(address, DatabaseRelationId, db_id);
1032
1033         /*
1034          * Close pg_database, but keep lock till commit.
1035          */
1036         heap_close(rel, NoLock);
1037
1038         return address;
1039 }
1040
1041
1042 /*
1043  * ALTER DATABASE SET TABLESPACE
1044  */
1045 static void
1046 movedb(const char *dbname, const char *tblspcname)
1047 {
1048         Oid                     db_id;
1049         Relation        pgdbrel;
1050         int                     notherbackends;
1051         int                     npreparedxacts;
1052         HeapTuple       oldtuple,
1053                                 newtuple;
1054         Oid                     src_tblspcoid,
1055                                 dst_tblspcoid;
1056         Datum           new_record[Natts_pg_database];
1057         bool            new_record_nulls[Natts_pg_database];
1058         bool            new_record_repl[Natts_pg_database];
1059         ScanKeyData scankey;
1060         SysScanDesc sysscan;
1061         AclResult       aclresult;
1062         char       *src_dbpath;
1063         char       *dst_dbpath;
1064         DIR                *dstdir;
1065         struct dirent *xlde;
1066         movedb_failure_params fparms;
1067
1068         /*
1069          * Look up the target database's OID, and get exclusive lock on it. We
1070          * need this to ensure that no new backend starts up in the database while
1071          * we are moving it, and that no one is using it as a CREATE DATABASE
1072          * template or trying to delete it.
1073          */
1074         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
1075
1076         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1077                                    NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1078                 ereport(ERROR,
1079                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1080                                  errmsg("database \"%s\" does not exist", dbname)));
1081
1082         /*
1083          * We actually need a session lock, so that the lock will persist across
1084          * the commit/restart below.  (We could almost get away with letting the
1085          * lock be released at commit, except that someone could try to move
1086          * relations of the DB back into the old directory while we rmtree() it.)
1087          */
1088         LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1089                                                            AccessExclusiveLock);
1090
1091         /*
1092          * Permission checks
1093          */
1094         if (!pg_database_ownercheck(db_id, GetUserId()))
1095                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1096                                            dbname);
1097
1098         /*
1099          * Obviously can't move the tables of my own database
1100          */
1101         if (db_id == MyDatabaseId)
1102                 ereport(ERROR,
1103                                 (errcode(ERRCODE_OBJECT_IN_USE),
1104                                  errmsg("cannot change the tablespace of the currently open database")));
1105
1106         /*
1107          * Get tablespace's oid
1108          */
1109         dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1110
1111         /*
1112          * Permission checks
1113          */
1114         aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1115                                                                            ACL_CREATE);
1116         if (aclresult != ACLCHECK_OK)
1117                 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
1118                                            tblspcname);
1119
1120         /*
1121          * pg_global must never be the default tablespace
1122          */
1123         if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1124                 ereport(ERROR,
1125                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1126                                  errmsg("pg_global cannot be used as default tablespace")));
1127
1128         /*
1129          * No-op if same tablespace
1130          */
1131         if (src_tblspcoid == dst_tblspcoid)
1132         {
1133                 heap_close(pgdbrel, NoLock);
1134                 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1135                                                                          AccessExclusiveLock);
1136                 return;
1137         }
1138
1139         /*
1140          * Check for other backends in the target database.  (Because we hold the
1141          * database lock, no new ones can start after this.)
1142          *
1143          * As in CREATE DATABASE, check this after other error conditions.
1144          */
1145         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1146                 ereport(ERROR,
1147                                 (errcode(ERRCODE_OBJECT_IN_USE),
1148                                  errmsg("database \"%s\" is being accessed by other users",
1149                                                 dbname),
1150                                  errdetail_busy_db(notherbackends, npreparedxacts)));
1151
1152         /*
1153          * Get old and new database paths
1154          */
1155         src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1156         dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1157
1158         /*
1159          * Force a checkpoint before proceeding. This will force all dirty
1160          * buffers, including those of unlogged tables, out to disk, to ensure
1161          * source database is up-to-date on disk for the copy.
1162          * FlushDatabaseBuffers() would suffice for that, but we also want to
1163          * process any pending unlink requests. Otherwise, the check for existing
1164          * files in the target directory might fail unnecessarily, not to mention
1165          * that the copy might fail due to source files getting deleted under it.
1166          * On Windows, this also ensures that background procs don't hold any open
1167          * files, which would cause rmdir() to fail.
1168          */
1169         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
1170                                           | CHECKPOINT_FLUSH_ALL);
1171
1172         /*
1173          * Now drop all buffers holding data of the target database; they should
1174          * no longer be dirty so DropDatabaseBuffers is safe.
1175          *
1176          * It might seem that we could just let these buffers age out of shared
1177          * buffers naturally, since they should not get referenced anymore.  The
1178          * problem with that is that if the user later moves the database back to
1179          * its original tablespace, any still-surviving buffers would appear to
1180          * contain valid data again --- but they'd be missing any changes made in
1181          * the database while it was in the new tablespace.  In any case, freeing
1182          * buffers that should never be used again seems worth the cycles.
1183          *
1184          * Note: it'd be sufficient to get rid of buffers matching db_id and
1185          * src_tblspcoid, but bufmgr.c presently provides no API for that.
1186          */
1187         DropDatabaseBuffers(db_id);
1188
1189         /*
1190          * Check for existence of files in the target directory, i.e., objects of
1191          * this database that are already in the target tablespace.  We can't
1192          * allow the move in such a case, because we would need to change those
1193          * relations' pg_class.reltablespace entries to zero, and we don't have
1194          * access to the DB's pg_class to do so.
1195          */
1196         dstdir = AllocateDir(dst_dbpath);
1197         if (dstdir != NULL)
1198         {
1199                 while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1200                 {
1201                         if (strcmp(xlde->d_name, ".") == 0 ||
1202                                 strcmp(xlde->d_name, "..") == 0)
1203                                 continue;
1204
1205                         ereport(ERROR,
1206                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1207                                          errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1208                                                         dbname, tblspcname),
1209                                          errhint("You must move them back to the database's default tablespace before using this command.")));
1210                 }
1211
1212                 FreeDir(dstdir);
1213
1214                 /*
1215                  * The directory exists but is empty. We must remove it before using
1216                  * the copydir function.
1217                  */
1218                 if (rmdir(dst_dbpath) != 0)
1219                         elog(ERROR, "could not remove directory \"%s\": %m",
1220                                  dst_dbpath);
1221         }
1222
1223         /*
1224          * Use an ENSURE block to make sure we remove the debris if the copy fails
1225          * (eg, due to out-of-disk-space).  This is not a 100% solution, because
1226          * of the possibility of failure during transaction commit, but it should
1227          * handle most scenarios.
1228          */
1229         fparms.dest_dboid = db_id;
1230         fparms.dest_tsoid = dst_tblspcoid;
1231         PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1232                                                         PointerGetDatum(&fparms));
1233         {
1234                 /*
1235                  * Copy files from the old tablespace to the new one
1236                  */
1237                 copydir(src_dbpath, dst_dbpath, false);
1238
1239                 /*
1240                  * Record the filesystem change in XLOG
1241                  */
1242                 {
1243                         xl_dbase_create_rec xlrec;
1244
1245                         xlrec.db_id = db_id;
1246                         xlrec.tablespace_id = dst_tblspcoid;
1247                         xlrec.src_db_id = db_id;
1248                         xlrec.src_tablespace_id = src_tblspcoid;
1249
1250                         XLogBeginInsert();
1251                         XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
1252
1253                         (void) XLogInsert(RM_DBASE_ID,
1254                                                           XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
1255                 }
1256
1257                 /*
1258                  * Update the database's pg_database tuple
1259                  */
1260                 ScanKeyInit(&scankey,
1261                                         Anum_pg_database_datname,
1262                                         BTEqualStrategyNumber, F_NAMEEQ,
1263                                         CStringGetDatum(dbname));
1264                 sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1265                                                                          NULL, 1, &scankey);
1266                 oldtuple = systable_getnext(sysscan);
1267                 if (!HeapTupleIsValid(oldtuple))                /* shouldn't happen... */
1268                         ereport(ERROR,
1269                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
1270                                          errmsg("database \"%s\" does not exist", dbname)));
1271
1272                 MemSet(new_record, 0, sizeof(new_record));
1273                 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1274                 MemSet(new_record_repl, false, sizeof(new_record_repl));
1275
1276                 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1277                 new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1278
1279                 newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1280                                                                          new_record,
1281                                                                          new_record_nulls, new_record_repl);
1282                 simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple);
1283
1284                 /* Update indexes */
1285                 CatalogUpdateIndexes(pgdbrel, newtuple);
1286
1287                 InvokeObjectPostAlterHook(DatabaseRelationId,
1288                                                                   HeapTupleGetOid(newtuple), 0);
1289
1290                 systable_endscan(sysscan);
1291
1292                 /*
1293                  * Force another checkpoint here.  As in CREATE DATABASE, this is to
1294                  * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1295                  * operation, which would cause us to lose any unlogged operations
1296                  * done in the new DB tablespace before the next checkpoint.
1297                  */
1298                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1299
1300                 /*
1301                  * Force synchronous commit, thus minimizing the window between
1302                  * copying the database files and commital of the transaction. If we
1303                  * crash before committing, we'll leave an orphaned set of files on
1304                  * disk, which is not fatal but not good either.
1305                  */
1306                 ForceSyncCommit();
1307
1308                 /*
1309                  * Close pg_database, but keep lock till commit.
1310                  */
1311                 heap_close(pgdbrel, NoLock);
1312         }
1313         PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1314                                                                 PointerGetDatum(&fparms));
1315
1316         /*
1317          * Commit the transaction so that the pg_database update is committed. If
1318          * we crash while removing files, the database won't be corrupt, we'll
1319          * just leave some orphaned files in the old directory.
1320          *
1321          * (This is OK because we know we aren't inside a transaction block.)
1322          *
1323          * XXX would it be safe/better to do this inside the ensure block?      Not
1324          * convinced it's a good idea; consider elog just after the transaction
1325          * really commits.
1326          */
1327         PopActiveSnapshot();
1328         CommitTransactionCommand();
1329
1330         /* Start new transaction for the remaining work; don't need a snapshot */
1331         StartTransactionCommand();
1332
1333         /*
1334          * Remove files from the old tablespace
1335          */
1336         if (!rmtree(src_dbpath, true))
1337                 ereport(WARNING,
1338                                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1339                                                 src_dbpath)));
1340
1341         /*
1342          * Record the filesystem change in XLOG
1343          */
1344         {
1345                 xl_dbase_drop_rec xlrec;
1346
1347                 xlrec.db_id = db_id;
1348                 xlrec.tablespace_id = src_tblspcoid;
1349
1350                 XLogBeginInsert();
1351                 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1352
1353                 (void) XLogInsert(RM_DBASE_ID,
1354                                                   XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1355         }
1356
1357         /* Now it's safe to release the database lock */
1358         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1359                                                                  AccessExclusiveLock);
1360 }
1361
1362 /* Error cleanup callback for movedb */
1363 static void
1364 movedb_failure_callback(int code, Datum arg)
1365 {
1366         movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1367         char       *dstpath;
1368
1369         /* Get rid of anything we managed to copy to the target directory */
1370         dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1371
1372         (void) rmtree(dstpath, true);
1373 }
1374
1375
1376 /*
1377  * ALTER DATABASE name ...
1378  */
1379 Oid
1380 AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
1381 {
1382         Relation        rel;
1383         Oid                     dboid;
1384         HeapTuple       tuple,
1385                                 newtuple;
1386         ScanKeyData scankey;
1387         SysScanDesc scan;
1388         ListCell   *option;
1389         bool            dbistemplate = false;
1390         bool            dballowconnections = true;
1391         int                     dbconnlimit = -1;
1392         DefElem    *distemplate = NULL;
1393         DefElem    *dallowconnections = NULL;
1394         DefElem    *dconnlimit = NULL;
1395         DefElem    *dtablespace = NULL;
1396         Datum           new_record[Natts_pg_database];
1397         bool            new_record_nulls[Natts_pg_database];
1398         bool            new_record_repl[Natts_pg_database];
1399
1400         /* Extract options from the statement node tree */
1401         foreach(option, stmt->options)
1402         {
1403                 DefElem    *defel = (DefElem *) lfirst(option);
1404
1405                 if (strcmp(defel->defname, "is_template") == 0)
1406                 {
1407                         if (distemplate)
1408                                 ereport(ERROR,
1409                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1410                                                  errmsg("conflicting or redundant options"),
1411                                                  parser_errposition(pstate, defel->location)));
1412                         distemplate = defel;
1413                 }
1414                 else if (strcmp(defel->defname, "allow_connections") == 0)
1415                 {
1416                         if (dallowconnections)
1417                                 ereport(ERROR,
1418                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1419                                                  errmsg("conflicting or redundant options"),
1420                                                  parser_errposition(pstate, defel->location)));
1421                         dallowconnections = defel;
1422                 }
1423                 else if (strcmp(defel->defname, "connection_limit") == 0)
1424                 {
1425                         if (dconnlimit)
1426                                 ereport(ERROR,
1427                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1428                                                  errmsg("conflicting or redundant options"),
1429                                                  parser_errposition(pstate, defel->location)));
1430                         dconnlimit = defel;
1431                 }
1432                 else if (strcmp(defel->defname, "tablespace") == 0)
1433                 {
1434                         if (dtablespace)
1435                                 ereport(ERROR,
1436                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1437                                                  errmsg("conflicting or redundant options"),
1438                                                  parser_errposition(pstate, defel->location)));
1439                         dtablespace = defel;
1440                 }
1441                 else
1442                         ereport(ERROR,
1443                                         (errcode(ERRCODE_SYNTAX_ERROR),
1444                                          errmsg("option \"%s\" not recognized", defel->defname),
1445                                          parser_errposition(pstate, defel->location)));
1446         }
1447
1448         if (dtablespace)
1449         {
1450                 /*
1451                  * While the SET TABLESPACE syntax doesn't allow any other options,
1452                  * somebody could write "WITH TABLESPACE ...".  Forbid any other
1453                  * options from being specified in that case.
1454                  */
1455                 if (list_length(stmt->options) != 1)
1456                         ereport(ERROR,
1457                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1458                            errmsg("option \"%s\" cannot be specified with other options",
1459                                           dtablespace->defname),
1460                                          parser_errposition(pstate, dtablespace->location)));
1461                 /* this case isn't allowed within a transaction block */
1462                 PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1463                 movedb(stmt->dbname, defGetString(dtablespace));
1464                 return InvalidOid;
1465         }
1466
1467         if (distemplate && distemplate->arg)
1468                 dbistemplate = defGetBoolean(distemplate);
1469         if (dallowconnections && dallowconnections->arg)
1470                 dballowconnections = defGetBoolean(dallowconnections);
1471         if (dconnlimit && dconnlimit->arg)
1472         {
1473                 dbconnlimit = defGetInt32(dconnlimit);
1474                 if (dbconnlimit < -1)
1475                         ereport(ERROR,
1476                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1477                                          errmsg("invalid connection limit: %d", dbconnlimit)));
1478         }
1479
1480         /*
1481          * Get the old tuple.  We don't need a lock on the database per se,
1482          * because we're not going to do anything that would mess up incoming
1483          * connections.
1484          */
1485         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1486         ScanKeyInit(&scankey,
1487                                 Anum_pg_database_datname,
1488                                 BTEqualStrategyNumber, F_NAMEEQ,
1489                                 CStringGetDatum(stmt->dbname));
1490         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1491                                                           NULL, 1, &scankey);
1492         tuple = systable_getnext(scan);
1493         if (!HeapTupleIsValid(tuple))
1494                 ereport(ERROR,
1495                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1496                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
1497
1498         dboid = HeapTupleGetOid(tuple);
1499
1500         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1501                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1502                                            stmt->dbname);
1503
1504         /*
1505          * In order to avoid getting locked out and having to go through
1506          * standalone mode, we refuse to disallow connections to the database
1507          * we're currently connected to.  Lockout can still happen with concurrent
1508          * sessions but the likeliness of that is not high enough to worry about.
1509          */
1510         if (!dballowconnections && dboid == MyDatabaseId)
1511                 ereport(ERROR,
1512                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1513                                  errmsg("cannot disallow connections for current database")));
1514
1515         /*
1516          * Build an updated tuple, perusing the information just obtained
1517          */
1518         MemSet(new_record, 0, sizeof(new_record));
1519         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1520         MemSet(new_record_repl, false, sizeof(new_record_repl));
1521
1522         if (distemplate)
1523         {
1524                 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1525                 new_record_repl[Anum_pg_database_datistemplate - 1] = true;
1526         }
1527         if (dallowconnections)
1528         {
1529                 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1530                 new_record_repl[Anum_pg_database_datallowconn - 1] = true;
1531         }
1532         if (dconnlimit)
1533         {
1534                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1535                 new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1536         }
1537
1538         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1539                                                                  new_record_nulls, new_record_repl);
1540         simple_heap_update(rel, &tuple->t_self, newtuple);
1541
1542         /* Update indexes */
1543         CatalogUpdateIndexes(rel, newtuple);
1544
1545         InvokeObjectPostAlterHook(DatabaseRelationId,
1546                                                           HeapTupleGetOid(newtuple), 0);
1547
1548         systable_endscan(scan);
1549
1550         /* Close pg_database, but keep lock till commit */
1551         heap_close(rel, NoLock);
1552
1553         return dboid;
1554 }
1555
1556
1557 /*
1558  * ALTER DATABASE name SET ...
1559  */
1560 Oid
1561 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1562 {
1563         Oid                     datid = get_database_oid(stmt->dbname, false);
1564
1565         /*
1566          * Obtain a lock on the database and make sure it didn't go away in the
1567          * meantime.
1568          */
1569         shdepLockAndCheckObject(DatabaseRelationId, datid);
1570
1571         if (!pg_database_ownercheck(datid, GetUserId()))
1572                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1573                                            stmt->dbname);
1574
1575         AlterSetting(datid, InvalidOid, stmt->setstmt);
1576
1577         UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1578
1579         return datid;
1580 }
1581
1582
1583 /*
1584  * ALTER DATABASE name OWNER TO newowner
1585  */
1586 ObjectAddress
1587 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1588 {
1589         Oid                     db_id;
1590         HeapTuple       tuple;
1591         Relation        rel;
1592         ScanKeyData scankey;
1593         SysScanDesc scan;
1594         Form_pg_database datForm;
1595         ObjectAddress address;
1596
1597         /*
1598          * Get the old tuple.  We don't need a lock on the database per se,
1599          * because we're not going to do anything that would mess up incoming
1600          * connections.
1601          */
1602         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1603         ScanKeyInit(&scankey,
1604                                 Anum_pg_database_datname,
1605                                 BTEqualStrategyNumber, F_NAMEEQ,
1606                                 CStringGetDatum(dbname));
1607         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1608                                                           NULL, 1, &scankey);
1609         tuple = systable_getnext(scan);
1610         if (!HeapTupleIsValid(tuple))
1611                 ereport(ERROR,
1612                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1613                                  errmsg("database \"%s\" does not exist", dbname)));
1614
1615         db_id = HeapTupleGetOid(tuple);
1616         datForm = (Form_pg_database) GETSTRUCT(tuple);
1617
1618         /*
1619          * If the new owner is the same as the existing owner, consider the
1620          * command to have succeeded.  This is to be consistent with other
1621          * objects.
1622          */
1623         if (datForm->datdba != newOwnerId)
1624         {
1625                 Datum           repl_val[Natts_pg_database];
1626                 bool            repl_null[Natts_pg_database];
1627                 bool            repl_repl[Natts_pg_database];
1628                 Acl                *newAcl;
1629                 Datum           aclDatum;
1630                 bool            isNull;
1631                 HeapTuple       newtuple;
1632
1633                 /* Otherwise, must be owner of the existing object */
1634                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1635                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1636                                                    dbname);
1637
1638                 /* Must be able to become new owner */
1639                 check_is_member_of_role(GetUserId(), newOwnerId);
1640
1641                 /*
1642                  * must have createdb rights
1643                  *
1644                  * NOTE: This is different from other alter-owner checks in that the
1645                  * current user is checked for createdb privileges instead of the
1646                  * destination owner.  This is consistent with the CREATE case for
1647                  * databases.  Because superusers will always have this right, we need
1648                  * no special case for them.
1649                  */
1650                 if (!have_createdb_privilege())
1651                         ereport(ERROR,
1652                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1653                                    errmsg("permission denied to change owner of database")));
1654
1655                 memset(repl_null, false, sizeof(repl_null));
1656                 memset(repl_repl, false, sizeof(repl_repl));
1657
1658                 repl_repl[Anum_pg_database_datdba - 1] = true;
1659                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1660
1661                 /*
1662                  * Determine the modified ACL for the new owner.  This is only
1663                  * necessary when the ACL is non-null.
1664                  */
1665                 aclDatum = heap_getattr(tuple,
1666                                                                 Anum_pg_database_datacl,
1667                                                                 RelationGetDescr(rel),
1668                                                                 &isNull);
1669                 if (!isNull)
1670                 {
1671                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1672                                                                  datForm->datdba, newOwnerId);
1673                         repl_repl[Anum_pg_database_datacl - 1] = true;
1674                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1675                 }
1676
1677                 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1678                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1679                 CatalogUpdateIndexes(rel, newtuple);
1680
1681                 heap_freetuple(newtuple);
1682
1683                 /* Update owner dependency reference */
1684                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1685                                                                 newOwnerId);
1686         }
1687
1688         InvokeObjectPostAlterHook(DatabaseRelationId, HeapTupleGetOid(tuple), 0);
1689
1690         ObjectAddressSet(address, DatabaseRelationId, db_id);
1691
1692         systable_endscan(scan);
1693
1694         /* Close pg_database, but keep lock till commit */
1695         heap_close(rel, NoLock);
1696
1697         return address;
1698 }
1699
1700
1701 /*
1702  * Helper functions
1703  */
1704
1705 /*
1706  * Look up info about the database named "name".  If the database exists,
1707  * obtain the specified lock type on it, fill in any of the remaining
1708  * parameters that aren't NULL, and return TRUE.  If no such database,
1709  * return FALSE.
1710  */
1711 static bool
1712 get_db_info(const char *name, LOCKMODE lockmode,
1713                         Oid *dbIdP, Oid *ownerIdP,
1714                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1715                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1716                         MultiXactId *dbMinMultiP,
1717                         Oid *dbTablespace, char **dbCollate, char **dbCtype)
1718 {
1719         bool            result = false;
1720         Relation        relation;
1721
1722         AssertArg(name);
1723
1724         /* Caller may wish to grab a better lock on pg_database beforehand... */
1725         relation = heap_open(DatabaseRelationId, AccessShareLock);
1726
1727         /*
1728          * Loop covers the rare case where the database is renamed before we can
1729          * lock it.  We try again just in case we can find a new one of the same
1730          * name.
1731          */
1732         for (;;)
1733         {
1734                 ScanKeyData scanKey;
1735                 SysScanDesc scan;
1736                 HeapTuple       tuple;
1737                 Oid                     dbOid;
1738
1739                 /*
1740                  * there's no syscache for database-indexed-by-name, so must do it the
1741                  * hard way
1742                  */
1743                 ScanKeyInit(&scanKey,
1744                                         Anum_pg_database_datname,
1745                                         BTEqualStrategyNumber, F_NAMEEQ,
1746                                         CStringGetDatum(name));
1747
1748                 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1749                                                                   NULL, 1, &scanKey);
1750
1751                 tuple = systable_getnext(scan);
1752
1753                 if (!HeapTupleIsValid(tuple))
1754                 {
1755                         /* definitely no database of that name */
1756                         systable_endscan(scan);
1757                         break;
1758                 }
1759
1760                 dbOid = HeapTupleGetOid(tuple);
1761
1762                 systable_endscan(scan);
1763
1764                 /*
1765                  * Now that we have a database OID, we can try to lock the DB.
1766                  */
1767                 if (lockmode != NoLock)
1768                         LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1769
1770                 /*
1771                  * And now, re-fetch the tuple by OID.  If it's still there and still
1772                  * the same name, we win; else, drop the lock and loop back to try
1773                  * again.
1774                  */
1775                 tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1776                 if (HeapTupleIsValid(tuple))
1777                 {
1778                         Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1779
1780                         if (strcmp(name, NameStr(dbform->datname)) == 0)
1781                         {
1782                                 /* oid of the database */
1783                                 if (dbIdP)
1784                                         *dbIdP = dbOid;
1785                                 /* oid of the owner */
1786                                 if (ownerIdP)
1787                                         *ownerIdP = dbform->datdba;
1788                                 /* character encoding */
1789                                 if (encodingP)
1790                                         *encodingP = dbform->encoding;
1791                                 /* allowed as template? */
1792                                 if (dbIsTemplateP)
1793                                         *dbIsTemplateP = dbform->datistemplate;
1794                                 /* allowing connections? */
1795                                 if (dbAllowConnP)
1796                                         *dbAllowConnP = dbform->datallowconn;
1797                                 /* last system OID used in database */
1798                                 if (dbLastSysOidP)
1799                                         *dbLastSysOidP = dbform->datlastsysoid;
1800                                 /* limit of frozen XIDs */
1801                                 if (dbFrozenXidP)
1802                                         *dbFrozenXidP = dbform->datfrozenxid;
1803                                 /* minimum MultixactId */
1804                                 if (dbMinMultiP)
1805                                         *dbMinMultiP = dbform->datminmxid;
1806                                 /* default tablespace for this database */
1807                                 if (dbTablespace)
1808                                         *dbTablespace = dbform->dattablespace;
1809                                 /* default locale settings for this database */
1810                                 if (dbCollate)
1811                                         *dbCollate = pstrdup(NameStr(dbform->datcollate));
1812                                 if (dbCtype)
1813                                         *dbCtype = pstrdup(NameStr(dbform->datctype));
1814                                 ReleaseSysCache(tuple);
1815                                 result = true;
1816                                 break;
1817                         }
1818                         /* can only get here if it was just renamed */
1819                         ReleaseSysCache(tuple);
1820                 }
1821
1822                 if (lockmode != NoLock)
1823                         UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1824         }
1825
1826         heap_close(relation, AccessShareLock);
1827
1828         return result;
1829 }
1830
1831 /* Check if current user has createdb privileges */
1832 static bool
1833 have_createdb_privilege(void)
1834 {
1835         bool            result = false;
1836         HeapTuple       utup;
1837
1838         /* Superusers can always do everything */
1839         if (superuser())
1840                 return true;
1841
1842         utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
1843         if (HeapTupleIsValid(utup))
1844         {
1845                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1846                 ReleaseSysCache(utup);
1847         }
1848         return result;
1849 }
1850
1851 /*
1852  * Remove tablespace directories
1853  *
1854  * We don't know what tablespaces db_id is using, so iterate through all
1855  * tablespaces removing <tablespace>/db_id
1856  */
1857 static void
1858 remove_dbtablespaces(Oid db_id)
1859 {
1860         Relation        rel;
1861         HeapScanDesc scan;
1862         HeapTuple       tuple;
1863
1864         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1865         scan = heap_beginscan_catalog(rel, 0, NULL);
1866         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1867         {
1868                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1869                 char       *dstpath;
1870                 struct stat st;
1871
1872                 /* Don't mess with the global tablespace */
1873                 if (dsttablespace == GLOBALTABLESPACE_OID)
1874                         continue;
1875
1876                 dstpath = GetDatabasePath(db_id, dsttablespace);
1877
1878                 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1879                 {
1880                         /* Assume we can ignore it */
1881                         pfree(dstpath);
1882                         continue;
1883                 }
1884
1885                 if (!rmtree(dstpath, true))
1886                         ereport(WARNING,
1887                                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
1888                                                         dstpath)));
1889
1890                 /* Record the filesystem change in XLOG */
1891                 {
1892                         xl_dbase_drop_rec xlrec;
1893
1894                         xlrec.db_id = db_id;
1895                         xlrec.tablespace_id = dsttablespace;
1896
1897                         XLogBeginInsert();
1898                         XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1899
1900                         (void) XLogInsert(RM_DBASE_ID,
1901                                                           XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1902                 }
1903
1904                 pfree(dstpath);
1905         }
1906
1907         heap_endscan(scan);
1908         heap_close(rel, AccessShareLock);
1909 }
1910
1911 /*
1912  * Check for existing files that conflict with a proposed new DB OID;
1913  * return TRUE if there are any
1914  *
1915  * If there were a subdirectory in any tablespace matching the proposed new
1916  * OID, we'd get a create failure due to the duplicate name ... and then we'd
1917  * try to remove that already-existing subdirectory during the cleanup in
1918  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
1919  * instead we make this extra check before settling on the OID of the new
1920  * database.  This exactly parallels what GetNewRelFileNode() does for table
1921  * relfilenode values.
1922  */
1923 static bool
1924 check_db_file_conflict(Oid db_id)
1925 {
1926         bool            result = false;
1927         Relation        rel;
1928         HeapScanDesc scan;
1929         HeapTuple       tuple;
1930
1931         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1932         scan = heap_beginscan_catalog(rel, 0, NULL);
1933         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1934         {
1935                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1936                 char       *dstpath;
1937                 struct stat st;
1938
1939                 /* Don't mess with the global tablespace */
1940                 if (dsttablespace == GLOBALTABLESPACE_OID)
1941                         continue;
1942
1943                 dstpath = GetDatabasePath(db_id, dsttablespace);
1944
1945                 if (lstat(dstpath, &st) == 0)
1946                 {
1947                         /* Found a conflicting file (or directory, whatever) */
1948                         pfree(dstpath);
1949                         result = true;
1950                         break;
1951                 }
1952
1953                 pfree(dstpath);
1954         }
1955
1956         heap_endscan(scan);
1957         heap_close(rel, AccessShareLock);
1958
1959         return result;
1960 }
1961
1962 /*
1963  * Issue a suitable errdetail message for a busy database
1964  */
1965 static int
1966 errdetail_busy_db(int notherbackends, int npreparedxacts)
1967 {
1968         if (notherbackends > 0 && npreparedxacts > 0)
1969
1970                 /*
1971                  * We don't deal with singular versus plural here, since gettext
1972                  * doesn't support multiple plurals in one string.
1973                  */
1974                 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1975                                   notherbackends, npreparedxacts);
1976         else if (notherbackends > 0)
1977                 errdetail_plural("There is %d other session using the database.",
1978                                                  "There are %d other sessions using the database.",
1979                                                  notherbackends,
1980                                                  notherbackends);
1981         else
1982                 errdetail_plural("There is %d prepared transaction using the database.",
1983                                         "There are %d prepared transactions using the database.",
1984                                                  npreparedxacts,
1985                                                  npreparedxacts);
1986         return 0;                                       /* just to keep ereport macro happy */
1987 }
1988
1989 /*
1990  * get_database_oid - given a database name, look up the OID
1991  *
1992  * If missing_ok is false, throw an error if database name not found.  If
1993  * true, just return InvalidOid.
1994  */
1995 Oid
1996 get_database_oid(const char *dbname, bool missing_ok)
1997 {
1998         Relation        pg_database;
1999         ScanKeyData entry[1];
2000         SysScanDesc scan;
2001         HeapTuple       dbtuple;
2002         Oid                     oid;
2003
2004         /*
2005          * There's no syscache for pg_database indexed by name, so we must look
2006          * the hard way.
2007          */
2008         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
2009         ScanKeyInit(&entry[0],
2010                                 Anum_pg_database_datname,
2011                                 BTEqualStrategyNumber, F_NAMEEQ,
2012                                 CStringGetDatum(dbname));
2013         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
2014                                                           NULL, 1, entry);
2015
2016         dbtuple = systable_getnext(scan);
2017
2018         /* We assume that there can be at most one matching tuple */
2019         if (HeapTupleIsValid(dbtuple))
2020                 oid = HeapTupleGetOid(dbtuple);
2021         else
2022                 oid = InvalidOid;
2023
2024         systable_endscan(scan);
2025         heap_close(pg_database, AccessShareLock);
2026
2027         if (!OidIsValid(oid) && !missing_ok)
2028                 ereport(ERROR,
2029                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
2030                                  errmsg("database \"%s\" does not exist",
2031                                                 dbname)));
2032
2033         return oid;
2034 }
2035
2036
2037 /*
2038  * get_database_name - given a database OID, look up the name
2039  *
2040  * Returns a palloc'd string, or NULL if no such database.
2041  */
2042 char *
2043 get_database_name(Oid dbid)
2044 {
2045         HeapTuple       dbtuple;
2046         char       *result;
2047
2048         dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2049         if (HeapTupleIsValid(dbtuple))
2050         {
2051                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
2052                 ReleaseSysCache(dbtuple);
2053         }
2054         else
2055                 result = NULL;
2056
2057         return result;
2058 }
2059
2060 /*
2061  * DATABASE resource manager's routines
2062  */
2063 void
2064 dbase_redo(XLogReaderState *record)
2065 {
2066         uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
2067
2068         /* Backup blocks are not used in dbase records */
2069         Assert(!XLogRecHasAnyBlockRefs(record));
2070
2071         if (info == XLOG_DBASE_CREATE)
2072         {
2073                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
2074                 char       *src_path;
2075                 char       *dst_path;
2076                 struct stat st;
2077
2078                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
2079                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2080
2081                 /*
2082                  * Our theory for replaying a CREATE is to forcibly drop the target
2083                  * subdirectory if present, then re-copy the source data. This may be
2084                  * more work than needed, but it is simple to implement.
2085                  */
2086                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
2087                 {
2088                         if (!rmtree(dst_path, true))
2089                                 /* If this failed, copydir() below is going to error. */
2090                                 ereport(WARNING,
2091                                                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
2092                                                                 dst_path)));
2093                 }
2094
2095                 /*
2096                  * Force dirty buffers out to disk, to ensure source database is
2097                  * up-to-date for the copy.
2098                  */
2099                 FlushDatabaseBuffers(xlrec->src_db_id);
2100
2101                 /*
2102                  * Copy this subdirectory to the new location
2103                  *
2104                  * We don't need to copy subdirectories
2105                  */
2106                 copydir(src_path, dst_path, false);
2107         }
2108         else if (info == XLOG_DBASE_DROP)
2109         {
2110                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
2111                 char       *dst_path;
2112
2113                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2114
2115                 if (InHotStandby)
2116                 {
2117                         /*
2118                          * Lock database while we resolve conflicts to ensure that
2119                          * InitPostgres() cannot fully re-execute concurrently. This
2120                          * avoids backends re-connecting automatically to same database,
2121                          * which can happen in some cases.
2122                          */
2123                         LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2124                         ResolveRecoveryConflictWithDatabase(xlrec->db_id);
2125                 }
2126
2127                 /* Drop pages for this database that are in the shared buffer cache */
2128                 DropDatabaseBuffers(xlrec->db_id);
2129
2130                 /* Also, clean out any fsync requests that might be pending in md.c */
2131                 ForgetDatabaseFsyncRequests(xlrec->db_id);
2132
2133                 /* Clean out the xlog relcache too */
2134                 XLogDropDatabase(xlrec->db_id);
2135
2136                 /* And remove the physical files */
2137                 if (!rmtree(dst_path, true))
2138                         ereport(WARNING,
2139                                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
2140                                                         dst_path)));
2141
2142                 if (InHotStandby)
2143                 {
2144                         /*
2145                          * Release locks prior to commit. XXX There is a race condition
2146                          * here that may allow backends to reconnect, but the window for
2147                          * this is small because the gap between here and commit is mostly
2148                          * fairly small and it is unlikely that people will be dropping
2149                          * databases that we are trying to connect to anyway.
2150                          */
2151                         UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2152                 }
2153         }
2154         else
2155                 elog(PANIC, "dbase_redo: unknown op code %u", info);
2156 }