]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Create a syscache for pg_database-indexed-by-oid, and make use of it
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands take ExclusiveLock on
7  * pg_database to ensure that no two proceed in parallel.  We must use
8  * at least this level of locking to ensure that no two backends try to
9  * write the flat-file copy of pg_database at once.  We avoid using
10  * AccessExclusiveLock since there's no need to lock out ordinary readers
11  * of pg_database.
12  *
13  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.180 2006/05/03 22:45:26 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include <fcntl.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 #include "access/genam.h"
29 #include "access/heapam.h"
30 #include "catalog/catalog.h"
31 #include "catalog/dependency.h"
32 #include "catalog/indexing.h"
33 #include "catalog/pg_authid.h"
34 #include "catalog/pg_database.h"
35 #include "catalog/pg_tablespace.h"
36 #include "commands/comment.h"
37 #include "commands/dbcommands.h"
38 #include "commands/tablespace.h"
39 #include "mb/pg_wchar.h"
40 #include "miscadmin.h"
41 #include "postmaster/bgwriter.h"
42 #include "storage/fd.h"
43 #include "storage/freespace.h"
44 #include "storage/procarray.h"
45 #include "utils/acl.h"
46 #include "utils/array.h"
47 #include "utils/builtins.h"
48 #include "utils/flatfiles.h"
49 #include "utils/fmgroids.h"
50 #include "utils/guc.h"
51 #include "utils/lsyscache.h"
52 #include "utils/syscache.h"
53
54
55 /* non-export function prototypes */
56 static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
57                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
58                         Oid *dbLastSysOidP,
59                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
60                         Oid *dbTablespace);
61 static bool have_createdb_privilege(void);
62 static void remove_dbtablespaces(Oid db_id);
63
64
65 /*
66  * CREATE DATABASE
67  */
68 void
69 createdb(const CreatedbStmt *stmt)
70 {
71         HeapScanDesc scan;
72         Relation        rel;
73         Oid                     src_dboid;
74         Oid                     src_owner;
75         int                     src_encoding;
76         bool            src_istemplate;
77         bool            src_allowconn;
78         Oid                     src_lastsysoid;
79         TransactionId src_vacuumxid;
80         TransactionId src_frozenxid;
81         Oid                     src_deftablespace;
82         volatile Oid dst_deftablespace;
83         volatile Relation pg_database_rel;
84         HeapTuple       tuple;
85         TupleDesc       pg_database_dsc;
86         Datum           new_record[Natts_pg_database];
87         char            new_record_nulls[Natts_pg_database];
88         Oid                     dboid;
89         volatile Oid datdba;
90         ListCell   *option;
91         DefElem    *dtablespacename = NULL;
92         DefElem    *downer = NULL;
93         DefElem    *dtemplate = NULL;
94         DefElem    *dencoding = NULL;
95         DefElem    *dconnlimit = NULL;
96         char       *dbname = stmt->dbname;
97         char       *dbowner = NULL;
98         const char *dbtemplate = NULL;
99         volatile int encoding = -1;
100         volatile int dbconnlimit = -1;
101
102         /* don't call this in a transaction block */
103         PreventTransactionChain((void *) stmt, "CREATE DATABASE");
104
105         /* Extract options from the statement node tree */
106         foreach(option, stmt->options)
107         {
108                 DefElem    *defel = (DefElem *) lfirst(option);
109
110                 if (strcmp(defel->defname, "tablespace") == 0)
111                 {
112                         if (dtablespacename)
113                                 ereport(ERROR,
114                                                 (errcode(ERRCODE_SYNTAX_ERROR),
115                                                  errmsg("conflicting or redundant options")));
116                         dtablespacename = defel;
117                 }
118                 else if (strcmp(defel->defname, "owner") == 0)
119                 {
120                         if (downer)
121                                 ereport(ERROR,
122                                                 (errcode(ERRCODE_SYNTAX_ERROR),
123                                                  errmsg("conflicting or redundant options")));
124                         downer = defel;
125                 }
126                 else if (strcmp(defel->defname, "template") == 0)
127                 {
128                         if (dtemplate)
129                                 ereport(ERROR,
130                                                 (errcode(ERRCODE_SYNTAX_ERROR),
131                                                  errmsg("conflicting or redundant options")));
132                         dtemplate = defel;
133                 }
134                 else if (strcmp(defel->defname, "encoding") == 0)
135                 {
136                         if (dencoding)
137                                 ereport(ERROR,
138                                                 (errcode(ERRCODE_SYNTAX_ERROR),
139                                                  errmsg("conflicting or redundant options")));
140                         dencoding = defel;
141                 }
142                 else if (strcmp(defel->defname, "connectionlimit") == 0)
143                 {
144                         if (dconnlimit)
145                                 ereport(ERROR,
146                                                 (errcode(ERRCODE_SYNTAX_ERROR),
147                                                  errmsg("conflicting or redundant options")));
148                         dconnlimit = defel;
149                 }
150                 else if (strcmp(defel->defname, "location") == 0)
151                 {
152                         ereport(WARNING,
153                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
154                                          errmsg("LOCATION is not supported anymore"),
155                                          errhint("Consider using tablespaces instead.")));
156                 }
157                 else
158                         elog(ERROR, "option \"%s\" not recognized",
159                                  defel->defname);
160         }
161
162         if (downer && downer->arg)
163                 dbowner = strVal(downer->arg);
164         if (dtemplate && dtemplate->arg)
165                 dbtemplate = strVal(dtemplate->arg);
166         if (dencoding && dencoding->arg)
167         {
168                 const char *encoding_name;
169
170                 if (IsA(dencoding->arg, Integer))
171                 {
172                         encoding = intVal(dencoding->arg);
173                         encoding_name = pg_encoding_to_char(encoding);
174                         if (strcmp(encoding_name, "") == 0 ||
175                                 pg_valid_server_encoding(encoding_name) < 0)
176                                 ereport(ERROR,
177                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
178                                                  errmsg("%d is not a valid encoding code",
179                                                                 encoding)));
180                 }
181                 else if (IsA(dencoding->arg, String))
182                 {
183                         encoding_name = strVal(dencoding->arg);
184                         if (pg_valid_server_encoding(encoding_name) < 0)
185                                 ereport(ERROR,
186                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
187                                                  errmsg("%s is not a valid encoding name",
188                                                                 encoding_name)));
189                         encoding = pg_char_to_encoding(encoding_name);
190                 }
191                 else
192                         elog(ERROR, "unrecognized node type: %d",
193                                  nodeTag(dencoding->arg));
194         }
195         if (dconnlimit && dconnlimit->arg)
196                 dbconnlimit = intVal(dconnlimit->arg);
197
198         /* obtain OID of proposed owner */
199         if (dbowner)
200                 datdba = get_roleid_checked(dbowner);
201         else
202                 datdba = GetUserId();
203
204         /*
205          * To create a database, must have createdb privilege and must be able to
206          * become the target role (this does not imply that the target role itself
207          * must have createdb privilege).  The latter provision guards against
208          * "giveaway" attacks.  Note that a superuser will always have both of
209          * these privileges a fortiori.
210          */
211         if (!have_createdb_privilege())
212                 ereport(ERROR,
213                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
214                                  errmsg("permission denied to create database")));
215
216         check_is_member_of_role(GetUserId(), datdba);
217
218         /*
219          * Check for db name conflict.  There is a race condition here, since
220          * another backend could create the same DB name before we commit.
221          * However, holding an exclusive lock on pg_database for the whole time we
222          * are copying the source database doesn't seem like a good idea, so
223          * accept possibility of race to create.  We will check again after we
224          * grab the exclusive lock.
225          */
226         if (get_db_info(dbname, NULL, NULL, NULL,
227                                         NULL, NULL, NULL, NULL, NULL, NULL))
228                 ereport(ERROR,
229                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
230                                  errmsg("database \"%s\" already exists", dbname)));
231
232         /*
233          * Lookup database (template) to be cloned.
234          */
235         if (!dbtemplate)
236                 dbtemplate = "template1";               /* Default template database name */
237
238         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
239                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
240                                          &src_vacuumxid, &src_frozenxid, &src_deftablespace))
241                 ereport(ERROR,
242                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
243                          errmsg("template database \"%s\" does not exist", dbtemplate)));
244
245         /*
246          * Permission check: to copy a DB that's not marked datistemplate, you
247          * must be superuser or the owner thereof.
248          */
249         if (!src_istemplate)
250         {
251                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
252                         ereport(ERROR,
253                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
254                                          errmsg("permission denied to copy database \"%s\"",
255                                                         dbtemplate)));
256         }
257
258         /*
259          * The source DB can't have any active backends, except this one
260          * (exception is to allow CREATE DB while connected to template1).
261          * Otherwise we might copy inconsistent data.  This check is not
262          * bulletproof, since someone might connect while we are copying...
263          */
264         if (DatabaseHasActiveBackends(src_dboid, true))
265                 ereport(ERROR,
266                                 (errcode(ERRCODE_OBJECT_IN_USE),
267                         errmsg("source database \"%s\" is being accessed by other users",
268                                    dbtemplate)));
269
270         /* If encoding is defaulted, use source's encoding */
271         if (encoding < 0)
272                 encoding = src_encoding;
273
274         /* Some encodings are client only */
275         if (!PG_VALID_BE_ENCODING(encoding))
276                 ereport(ERROR,
277                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
278                                  errmsg("invalid server encoding %d", encoding)));
279
280         /* Resolve default tablespace for new database */
281         if (dtablespacename && dtablespacename->arg)
282         {
283                 char       *tablespacename;
284                 AclResult       aclresult;
285
286                 tablespacename = strVal(dtablespacename->arg);
287                 dst_deftablespace = get_tablespace_oid(tablespacename);
288                 if (!OidIsValid(dst_deftablespace))
289                         ereport(ERROR,
290                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
291                                          errmsg("tablespace \"%s\" does not exist",
292                                                         tablespacename)));
293                 /* check permissions */
294                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
295                                                                                    ACL_CREATE);
296                 if (aclresult != ACLCHECK_OK)
297                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
298                                                    tablespacename);
299
300                 /*
301                  * If we are trying to change the default tablespace of the template,
302                  * we require that the template not have any files in the new default
303                  * tablespace.  This is necessary because otherwise the copied
304                  * database would contain pg_class rows that refer to its default
305                  * tablespace both explicitly (by OID) and implicitly (as zero), which
306                  * would cause problems.  For example another CREATE DATABASE using
307                  * the copied database as template, and trying to change its default
308                  * tablespace again, would yield outright incorrect results (it would
309                  * improperly move tables to the new default tablespace that should
310                  * stay in the same tablespace).
311                  */
312                 if (dst_deftablespace != src_deftablespace)
313                 {
314                         char       *srcpath;
315                         struct stat st;
316
317                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
318
319                         if (stat(srcpath, &st) == 0 &&
320                                 S_ISDIR(st.st_mode) &&
321                                 !directory_is_empty(srcpath))
322                                 ereport(ERROR,
323                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
324                                                  errmsg("cannot assign new default tablespace \"%s\"",
325                                                                 tablespacename),
326                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
327                                                                    dbtemplate)));
328                         pfree(srcpath);
329                 }
330         }
331         else
332         {
333                 /* Use template database's default tablespace */
334                 dst_deftablespace = src_deftablespace;
335                 /* Note there is no additional permission check in this path */
336         }
337
338         /*
339          * Normally we mark the new database with the same datvacuumxid and
340          * datfrozenxid as the source.  However, if the source is not allowing
341          * connections then we assume it is fully frozen, and we can set the
342          * current transaction ID as the xid limits.  This avoids immediately
343          * starting to generate warnings after cloning template0.
344          */
345         if (!src_allowconn)
346                 src_vacuumxid = src_frozenxid = GetCurrentTransactionId();
347
348         /*
349          * Preassign OID for pg_database tuple, so that we can compute db path. We
350          * have to open pg_database to do this, but we don't want to take
351          * ExclusiveLock yet, so just do it and close again.
352          */
353         pg_database_rel = heap_open(DatabaseRelationId, AccessShareLock);
354         dboid = GetNewOid(pg_database_rel);
355         heap_close(pg_database_rel, AccessShareLock);
356         pg_database_rel = NULL;
357
358         /*
359          * Force dirty buffers out to disk, to ensure source database is
360          * up-to-date for the copy.  (We really only need to flush buffers for the
361          * source database, but bufmgr.c provides no API for that.)
362          */
363         BufferSync();
364
365         /*
366          * Once we start copying subdirectories, we need to be able to clean 'em
367          * up if we fail.  Establish a TRY block to make sure this happens. (This
368          * is not a 100% solution, because of the possibility of failure during
369          * transaction commit after we leave this routine, but it should handle
370          * most scenarios.)
371          */
372         PG_TRY();
373         {
374                 /*
375                  * Iterate through all tablespaces of the template database, and copy
376                  * each one to the new database.
377                  */
378                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
379                 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
380                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
381                 {
382                         Oid                     srctablespace = HeapTupleGetOid(tuple);
383                         Oid                     dsttablespace;
384                         char       *srcpath;
385                         char       *dstpath;
386                         struct stat st;
387
388                         /* No need to copy global tablespace */
389                         if (srctablespace == GLOBALTABLESPACE_OID)
390                                 continue;
391
392                         srcpath = GetDatabasePath(src_dboid, srctablespace);
393
394                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
395                                 directory_is_empty(srcpath))
396                         {
397                                 /* Assume we can ignore it */
398                                 pfree(srcpath);
399                                 continue;
400                         }
401
402                         if (srctablespace == src_deftablespace)
403                                 dsttablespace = dst_deftablespace;
404                         else
405                                 dsttablespace = srctablespace;
406
407                         dstpath = GetDatabasePath(dboid, dsttablespace);
408
409                         /*
410                          * Copy this subdirectory to the new location
411                          *
412                          * We don't need to copy subdirectories
413                          */
414                         copydir(srcpath, dstpath, false);
415
416                         /* Record the filesystem change in XLOG */
417                         {
418                                 xl_dbase_create_rec xlrec;
419                                 XLogRecData rdata[1];
420
421                                 xlrec.db_id = dboid;
422                                 xlrec.tablespace_id = dsttablespace;
423                                 xlrec.src_db_id = src_dboid;
424                                 xlrec.src_tablespace_id = srctablespace;
425
426                                 rdata[0].data = (char *) &xlrec;
427                                 rdata[0].len = sizeof(xl_dbase_create_rec);
428                                 rdata[0].buffer = InvalidBuffer;
429                                 rdata[0].next = NULL;
430
431                                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
432                         }
433                 }
434                 heap_endscan(scan);
435                 heap_close(rel, AccessShareLock);
436
437                 /*
438                  * Now OK to grab exclusive lock on pg_database.
439                  */
440                 pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock);
441
442                 /* Check to see if someone else created same DB name meanwhile. */
443                 if (get_db_info(dbname, NULL, NULL, NULL,
444                                                 NULL, NULL, NULL, NULL, NULL, NULL))
445                         ereport(ERROR,
446                                         (errcode(ERRCODE_DUPLICATE_DATABASE),
447                                          errmsg("database \"%s\" already exists", dbname)));
448
449                 /*
450                  * Insert a new tuple into pg_database
451                  */
452                 pg_database_dsc = RelationGetDescr(pg_database_rel);
453
454                 /* Form tuple */
455                 MemSet(new_record, 0, sizeof(new_record));
456                 MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
457
458                 new_record[Anum_pg_database_datname - 1] =
459                         DirectFunctionCall1(namein, CStringGetDatum(dbname));
460                 new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
461                 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
462                 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
463                 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
464                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
465                 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
466                 new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
467                 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
468                 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
469
470                 /*
471                  * We deliberately set datconfig and datacl to defaults (NULL), rather
472                  * than copying them from the template database.  Copying datacl would
473                  * be a bad idea when the owner is not the same as the template's
474                  * owner. It's more debatable whether datconfig should be copied.
475                  */
476                 new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
477                 new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
478
479                 tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
480
481                 HeapTupleSetOid(tuple, dboid);  /* override heap_insert's OID
482                                                                                  * selection */
483
484                 simple_heap_insert(pg_database_rel, tuple);
485
486                 /* Update indexes */
487                 CatalogUpdateIndexes(pg_database_rel, tuple);
488
489                 /* Register owner dependency */
490                 recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
491
492                 /* Create pg_shdepend entries for objects within database */
493                 copyTemplateDependencies(src_dboid, dboid);
494
495                 /*
496                  * We force a checkpoint before committing.  This effectively means
497                  * that committed XLOG_DBASE_CREATE operations will never need to be
498                  * replayed (at least not in ordinary crash recovery; we still have to
499                  * make the XLOG entry for the benefit of PITR operations). This
500                  * avoids two nasty scenarios:
501                  *
502                  * #1: When PITR is off, we don't XLOG the contents of newly created
503                  * indexes; therefore the drop-and-recreate-whole-directory behavior
504                  * of DBASE_CREATE replay would lose such indexes.
505                  *
506                  * #2: Since we have to recopy the source database during DBASE_CREATE
507                  * replay, we run the risk of copying changes in it that were
508                  * committed after the original CREATE DATABASE command but before the
509                  * system crash that led to the replay.  This is at least unexpected
510                  * and at worst could lead to inconsistencies, eg duplicate table
511                  * names.
512                  *
513                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
514                  *
515                  * In PITR replay, the first of these isn't an issue, and the second
516                  * is only a risk if the CREATE DATABASE and subsequent template
517                  * database change both occur while a base backup is being taken.
518                  * There doesn't seem to be much we can do about that except document
519                  * it as a limitation.
520                  *
521                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
522                  * we can avoid this.
523                  */
524                 RequestCheckpoint(true, false);
525
526                 /*
527                  * Set flag to update flat database file at commit.
528                  */
529                 database_file_update_needed();
530         }
531         PG_CATCH();
532         {
533                 /* Don't hold pg_database lock while doing recursive remove */
534                 if (pg_database_rel != NULL)
535                         heap_close(pg_database_rel, ExclusiveLock);
536
537                 /* Throw away any successfully copied subdirectories */
538                 remove_dbtablespaces(dboid);
539
540                 PG_RE_THROW();
541         }
542         PG_END_TRY();
543
544         /* Close pg_database, but keep exclusive lock till commit */
545         /* This has to be outside the PG_TRY */
546         heap_close(pg_database_rel, NoLock);
547 }
548
549
550 /*
551  * DROP DATABASE
552  */
553 void
554 dropdb(const char *dbname, bool missing_ok)
555 {
556         Oid                     db_id;
557         bool            db_istemplate;
558         Relation        pgdbrel;
559         HeapTuple       tup;
560
561         PreventTransactionChain((void *) dbname, "DROP DATABASE");
562
563         AssertArg(dbname);
564
565         if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0)
566                 ereport(ERROR,
567                                 (errcode(ERRCODE_OBJECT_IN_USE),
568                                  errmsg("cannot drop the currently open database")));
569
570         /*
571          * Obtain exclusive lock on pg_database.  We need this to ensure that no
572          * new backend starts up in the target database while we are deleting it.
573          * (Actually, a new backend might still manage to start up, because it
574          * isn't able to lock pg_database while starting.  But it will detect its
575          * error in ReverifyMyDatabase and shut down before any serious damage is
576          * done.  See postinit.c.)
577          *
578          * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient since
579          * ReverifyMyDatabase takes RowShareLock.  This allows ordinary readers of
580          * pg_database to proceed in parallel.
581          */
582         pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock);
583
584         if (!get_db_info(dbname, &db_id, NULL, NULL,
585                                          &db_istemplate, NULL, NULL, NULL, NULL, NULL))
586         {
587                 if (!missing_ok)
588                 {
589                         ereport(ERROR,
590                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
591                                          errmsg("database \"%s\" does not exist", dbname)));
592                 }
593                 else
594                 {
595
596                         /* Close pg_database, release the lock, since we changed nothing */
597                         heap_close(pgdbrel, ExclusiveLock);
598                         ereport(NOTICE,
599                                         (errmsg("database \"%s\" does not exist, skipping",
600                                                         dbname)));
601
602                         return;
603                 }
604         }
605
606         if (!pg_database_ownercheck(db_id, GetUserId()))
607                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
608                                            dbname);
609
610         /*
611          * Disallow dropping a DB that is marked istemplate.  This is just to
612          * prevent people from accidentally dropping template0 or template1; they
613          * can do so if they're really determined ...
614          */
615         if (db_istemplate)
616                 ereport(ERROR,
617                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
618                                  errmsg("cannot drop a template database")));
619
620         /*
621          * Check for active backends in the target database.
622          */
623         if (DatabaseHasActiveBackends(db_id, false))
624                 ereport(ERROR,
625                                 (errcode(ERRCODE_OBJECT_IN_USE),
626                                  errmsg("database \"%s\" is being accessed by other users",
627                                                 dbname)));
628
629         /*
630          * Remove the database's tuple from pg_database.
631          */
632         tup = SearchSysCache(DATABASEOID,
633                                                  ObjectIdGetDatum(db_id),
634                                                  0, 0, 0);
635         if (!HeapTupleIsValid(tup))
636                 elog(ERROR, "cache lookup failed for database %u", db_id);
637
638         simple_heap_delete(pgdbrel, &tup->t_self);
639
640         ReleaseSysCache(tup);
641
642         /*
643          * Delete any comments associated with the database
644          *
645          */
646         DeleteSharedComments(db_id, DatabaseRelationId);
647
648         /*
649          * Remove shared dependency references for the database.
650          */
651         dropDatabaseDependencies(db_id);
652
653         /*
654          * Drop pages for this database that are in the shared buffer cache. This
655          * is important to ensure that no remaining backend tries to write out a
656          * dirty buffer to the dead database later...
657          */
658         DropDatabaseBuffers(db_id);
659
660         /*
661          * Also, clean out any entries in the shared free space map.
662          */
663         FreeSpaceMapForgetDatabase(db_id);
664
665         /*
666          * On Windows, force a checkpoint so that the bgwriter doesn't hold any
667          * open files, which would cause rmdir() to fail.
668          */
669 #ifdef WIN32
670         RequestCheckpoint(true, false);
671 #endif
672
673         /*
674          * Remove all tablespace subdirs belonging to the database.
675          */
676         remove_dbtablespaces(db_id);
677
678         /* Close pg_database, but keep exclusive lock till commit */
679         heap_close(pgdbrel, NoLock);
680
681         /*
682          * Set flag to update flat database file at commit.
683          */
684         database_file_update_needed();
685 }
686
687
688 /*
689  * Rename database
690  */
691 void
692 RenameDatabase(const char *oldname, const char *newname)
693 {
694         HeapTuple       tup,
695                                 newtup;
696         Relation        rel;
697         SysScanDesc scan,
698                                 scan2;
699         ScanKeyData key,
700                                 key2;
701
702         /*
703          * Obtain ExclusiveLock so that no new session gets started while the
704          * rename is in progress.
705          */
706         rel = heap_open(DatabaseRelationId, ExclusiveLock);
707
708         ScanKeyInit(&key,
709                                 Anum_pg_database_datname,
710                                 BTEqualStrategyNumber, F_NAMEEQ,
711                                 NameGetDatum(oldname));
712         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
713                                                           SnapshotNow, 1, &key);
714
715         tup = systable_getnext(scan);
716         if (!HeapTupleIsValid(tup))
717                 ereport(ERROR,
718                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
719                                  errmsg("database \"%s\" does not exist", oldname)));
720
721         /*
722          * XXX Client applications probably store the current database somewhere,
723          * so renaming it could cause confusion.  On the other hand, there may not
724          * be an actual problem besides a little confusion, so think about this
725          * and decide.
726          */
727         if (HeapTupleGetOid(tup) == MyDatabaseId)
728                 ereport(ERROR,
729                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
730                                  errmsg("current database may not be renamed")));
731
732         /*
733          * Make sure the database does not have active sessions.  Might not be
734          * necessary, but it's consistent with other database operations.
735          */
736         if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false))
737                 ereport(ERROR,
738                                 (errcode(ERRCODE_OBJECT_IN_USE),
739                                  errmsg("database \"%s\" is being accessed by other users",
740                                                 oldname)));
741
742         /* make sure the new name doesn't exist */
743         ScanKeyInit(&key2,
744                                 Anum_pg_database_datname,
745                                 BTEqualStrategyNumber, F_NAMEEQ,
746                                 NameGetDatum(newname));
747         scan2 = systable_beginscan(rel, DatabaseNameIndexId, true,
748                                                            SnapshotNow, 1, &key2);
749         if (HeapTupleIsValid(systable_getnext(scan2)))
750                 ereport(ERROR,
751                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
752                                  errmsg("database \"%s\" already exists", newname)));
753         systable_endscan(scan2);
754
755         /* must be owner */
756         if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId()))
757                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
758                                            oldname);
759
760         /* must have createdb rights */
761         if (!have_createdb_privilege())
762                 ereport(ERROR,
763                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
764                                  errmsg("permission denied to rename database")));
765
766         /* rename */
767         newtup = heap_copytuple(tup);
768         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
769         simple_heap_update(rel, &newtup->t_self, newtup);
770         CatalogUpdateIndexes(rel, newtup);
771
772         systable_endscan(scan);
773
774         /* Close pg_database, but keep exclusive lock till commit */
775         heap_close(rel, NoLock);
776
777         /*
778          * Set flag to update flat database file at commit.
779          */
780         database_file_update_needed();
781 }
782
783
784 /*
785  * ALTER DATABASE name ...
786  */
787 void
788 AlterDatabase(AlterDatabaseStmt *stmt)
789 {
790         Relation        rel;
791         HeapTuple       tuple,
792                                 newtuple;
793         ScanKeyData scankey;
794         SysScanDesc scan;
795         ListCell   *option;
796         int                     connlimit = -1;
797         DefElem    *dconnlimit = NULL;
798         Datum           new_record[Natts_pg_database];
799         char            new_record_nulls[Natts_pg_database];
800         char            new_record_repl[Natts_pg_database];
801
802         /* Extract options from the statement node tree */
803         foreach(option, stmt->options)
804         {
805                 DefElem    *defel = (DefElem *) lfirst(option);
806
807                 if (strcmp(defel->defname, "connectionlimit") == 0)
808                 {
809                         if (dconnlimit)
810                                 ereport(ERROR,
811                                                 (errcode(ERRCODE_SYNTAX_ERROR),
812                                                  errmsg("conflicting or redundant options")));
813                         dconnlimit = defel;
814                 }
815                 else
816                         elog(ERROR, "option \"%s\" not recognized",
817                                  defel->defname);
818         }
819
820         if (dconnlimit)
821                 connlimit = intVal(dconnlimit->arg);
822
823         /*
824          * We don't need ExclusiveLock since we aren't updating the flat file.
825          */
826         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
827         ScanKeyInit(&scankey,
828                                 Anum_pg_database_datname,
829                                 BTEqualStrategyNumber, F_NAMEEQ,
830                                 NameGetDatum(stmt->dbname));
831         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
832                                                           SnapshotNow, 1, &scankey);
833         tuple = systable_getnext(scan);
834         if (!HeapTupleIsValid(tuple))
835                 ereport(ERROR,
836                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
837                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
838
839         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
840                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
841                                            stmt->dbname);
842
843         /*
844          * Build an updated tuple, perusing the information just obtained
845          */
846         MemSet(new_record, 0, sizeof(new_record));
847         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
848         MemSet(new_record_repl, ' ', sizeof(new_record_repl));
849
850         if (dconnlimit)
851         {
852                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
853                 new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
854         }
855
856         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
857                                                                 new_record_nulls, new_record_repl);
858         simple_heap_update(rel, &tuple->t_self, newtuple);
859
860         /* Update indexes */
861         CatalogUpdateIndexes(rel, newtuple);
862
863         systable_endscan(scan);
864
865         /* Close pg_database, but keep lock till commit */
866         heap_close(rel, NoLock);
867
868         /*
869          * We don't bother updating the flat file since the existing options for
870          * ALTER DATABASE don't affect it.
871          */
872 }
873
874
875 /*
876  * ALTER DATABASE name SET ...
877  */
878 void
879 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
880 {
881         char       *valuestr;
882         HeapTuple       tuple,
883                                 newtuple;
884         Relation        rel;
885         ScanKeyData scankey;
886         SysScanDesc scan;
887         Datum           repl_val[Natts_pg_database];
888         char            repl_null[Natts_pg_database];
889         char            repl_repl[Natts_pg_database];
890
891         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
892
893         /*
894          * We don't need ExclusiveLock since we aren't updating the flat file.
895          */
896         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
897         ScanKeyInit(&scankey,
898                                 Anum_pg_database_datname,
899                                 BTEqualStrategyNumber, F_NAMEEQ,
900                                 NameGetDatum(stmt->dbname));
901         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
902                                                           SnapshotNow, 1, &scankey);
903         tuple = systable_getnext(scan);
904         if (!HeapTupleIsValid(tuple))
905                 ereport(ERROR,
906                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
907                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
908
909         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
910                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
911                                            stmt->dbname);
912
913         MemSet(repl_repl, ' ', sizeof(repl_repl));
914         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
915
916         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
917         {
918                 /* RESET ALL */
919                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
920                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
921         }
922         else
923         {
924                 Datum           datum;
925                 bool            isnull;
926                 ArrayType  *a;
927
928                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
929
930                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
931                                                          RelationGetDescr(rel), &isnull);
932
933                 a = isnull ? NULL : DatumGetArrayTypeP(datum);
934
935                 if (valuestr)
936                         a = GUCArrayAdd(a, stmt->variable, valuestr);
937                 else
938                         a = GUCArrayDelete(a, stmt->variable);
939
940                 if (a)
941                         repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
942                 else
943                         repl_null[Anum_pg_database_datconfig - 1] = 'n';
944         }
945
946         newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
947         simple_heap_update(rel, &tuple->t_self, newtuple);
948
949         /* Update indexes */
950         CatalogUpdateIndexes(rel, newtuple);
951
952         systable_endscan(scan);
953
954         /* Close pg_database, but keep lock till commit */
955         heap_close(rel, NoLock);
956
957         /*
958          * We don't bother updating the flat file since ALTER DATABASE SET doesn't
959          * affect it.
960          */
961 }
962
963
964 /*
965  * ALTER DATABASE name OWNER TO newowner
966  */
967 void
968 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
969 {
970         HeapTuple       tuple;
971         Relation        rel;
972         ScanKeyData scankey;
973         SysScanDesc scan;
974         Form_pg_database datForm;
975
976         /*
977          * We don't need ExclusiveLock since we aren't updating the flat file.
978          */
979         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
980         ScanKeyInit(&scankey,
981                                 Anum_pg_database_datname,
982                                 BTEqualStrategyNumber, F_NAMEEQ,
983                                 NameGetDatum(dbname));
984         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
985                                                           SnapshotNow, 1, &scankey);
986         tuple = systable_getnext(scan);
987         if (!HeapTupleIsValid(tuple))
988                 ereport(ERROR,
989                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
990                                  errmsg("database \"%s\" does not exist", dbname)));
991
992         datForm = (Form_pg_database) GETSTRUCT(tuple);
993
994         /*
995          * If the new owner is the same as the existing owner, consider the
996          * command to have succeeded.  This is to be consistent with other
997          * objects.
998          */
999         if (datForm->datdba != newOwnerId)
1000         {
1001                 Datum           repl_val[Natts_pg_database];
1002                 char            repl_null[Natts_pg_database];
1003                 char            repl_repl[Natts_pg_database];
1004                 Acl                *newAcl;
1005                 Datum           aclDatum;
1006                 bool            isNull;
1007                 HeapTuple       newtuple;
1008
1009                 /* Otherwise, must be owner of the existing object */
1010                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1011                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1012                                                    dbname);
1013
1014                 /* Must be able to become new owner */
1015                 check_is_member_of_role(GetUserId(), newOwnerId);
1016
1017                 /*
1018                  * must have createdb rights
1019                  *
1020                  * NOTE: This is different from other alter-owner checks in that the
1021                  * current user is checked for createdb privileges instead of the
1022                  * destination owner.  This is consistent with the CREATE case for
1023                  * databases.  Because superusers will always have this right, we need
1024                  * no special case for them.
1025                  */
1026                 if (!have_createdb_privilege())
1027                         ereport(ERROR,
1028                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1029                                    errmsg("permission denied to change owner of database")));
1030
1031                 memset(repl_null, ' ', sizeof(repl_null));
1032                 memset(repl_repl, ' ', sizeof(repl_repl));
1033
1034                 repl_repl[Anum_pg_database_datdba - 1] = 'r';
1035                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1036
1037                 /*
1038                  * Determine the modified ACL for the new owner.  This is only
1039                  * necessary when the ACL is non-null.
1040                  */
1041                 aclDatum = heap_getattr(tuple,
1042                                                                 Anum_pg_database_datacl,
1043                                                                 RelationGetDescr(rel),
1044                                                                 &isNull);
1045                 if (!isNull)
1046                 {
1047                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1048                                                                  datForm->datdba, newOwnerId);
1049                         repl_repl[Anum_pg_database_datacl - 1] = 'r';
1050                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1051                 }
1052
1053                 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1054                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1055                 CatalogUpdateIndexes(rel, newtuple);
1056
1057                 heap_freetuple(newtuple);
1058
1059                 /* Update owner dependency reference */
1060                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1061                                                                 newOwnerId);
1062         }
1063
1064         systable_endscan(scan);
1065
1066         /* Close pg_database, but keep lock till commit */
1067         heap_close(rel, NoLock);
1068
1069         /*
1070          * We don't bother updating the flat file since ALTER DATABASE OWNER
1071          * doesn't affect it.
1072          */
1073 }
1074
1075
1076 /*
1077  * Helper functions
1078  */
1079
1080 static bool
1081 get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP,
1082                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1083                         Oid *dbLastSysOidP,
1084                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
1085                         Oid *dbTablespace)
1086 {
1087         Relation        relation;
1088         ScanKeyData scanKey;
1089         SysScanDesc scan;
1090         HeapTuple       tuple;
1091         bool            gottuple;
1092
1093         AssertArg(name);
1094
1095         /* Caller may wish to grab a better lock on pg_database beforehand... */
1096         relation = heap_open(DatabaseRelationId, AccessShareLock);
1097
1098         ScanKeyInit(&scanKey,
1099                                 Anum_pg_database_datname,
1100                                 BTEqualStrategyNumber, F_NAMEEQ,
1101                                 NameGetDatum(name));
1102
1103         scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1104                                                           SnapshotNow, 1, &scanKey);
1105
1106         tuple = systable_getnext(scan);
1107
1108         gottuple = HeapTupleIsValid(tuple);
1109         if (gottuple)
1110         {
1111                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1112
1113                 /* oid of the database */
1114                 if (dbIdP)
1115                         *dbIdP = HeapTupleGetOid(tuple);
1116                 /* oid of the owner */
1117                 if (ownerIdP)
1118                         *ownerIdP = dbform->datdba;
1119                 /* character encoding */
1120                 if (encodingP)
1121                         *encodingP = dbform->encoding;
1122                 /* allowed as template? */
1123                 if (dbIsTemplateP)
1124                         *dbIsTemplateP = dbform->datistemplate;
1125                 /* allowing connections? */
1126                 if (dbAllowConnP)
1127                         *dbAllowConnP = dbform->datallowconn;
1128                 /* last system OID used in database */
1129                 if (dbLastSysOidP)
1130                         *dbLastSysOidP = dbform->datlastsysoid;
1131                 /* limit of vacuumed XIDs */
1132                 if (dbVacuumXidP)
1133                         *dbVacuumXidP = dbform->datvacuumxid;
1134                 /* limit of frozen XIDs */
1135                 if (dbFrozenXidP)
1136                         *dbFrozenXidP = dbform->datfrozenxid;
1137                 /* default tablespace for this database */
1138                 if (dbTablespace)
1139                         *dbTablespace = dbform->dattablespace;
1140         }
1141
1142         systable_endscan(scan);
1143         heap_close(relation, AccessShareLock);
1144
1145         return gottuple;
1146 }
1147
1148 /* Check if current user has createdb privileges */
1149 static bool
1150 have_createdb_privilege(void)
1151 {
1152         bool            result = false;
1153         HeapTuple       utup;
1154
1155         /* Superusers can always do everything */
1156         if (superuser())
1157                 return true;
1158
1159         utup = SearchSysCache(AUTHOID,
1160                                                   ObjectIdGetDatum(GetUserId()),
1161                                                   0, 0, 0);
1162         if (HeapTupleIsValid(utup))
1163         {
1164                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1165                 ReleaseSysCache(utup);
1166         }
1167         return result;
1168 }
1169
1170 /*
1171  * Remove tablespace directories
1172  *
1173  * We don't know what tablespaces db_id is using, so iterate through all
1174  * tablespaces removing <tablespace>/db_id
1175  */
1176 static void
1177 remove_dbtablespaces(Oid db_id)
1178 {
1179         Relation        rel;
1180         HeapScanDesc scan;
1181         HeapTuple       tuple;
1182
1183         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1184         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1185         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1186         {
1187                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1188                 char       *dstpath;
1189                 struct stat st;
1190
1191                 /* Don't mess with the global tablespace */
1192                 if (dsttablespace == GLOBALTABLESPACE_OID)
1193                         continue;
1194
1195                 dstpath = GetDatabasePath(db_id, dsttablespace);
1196
1197                 if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1198                 {
1199                         /* Assume we can ignore it */
1200                         pfree(dstpath);
1201                         continue;
1202                 }
1203
1204                 if (!rmtree(dstpath, true))
1205                         ereport(WARNING,
1206                                         (errmsg("could not remove database directory \"%s\"",
1207                                                         dstpath)));
1208
1209                 /* Record the filesystem change in XLOG */
1210                 {
1211                         xl_dbase_drop_rec xlrec;
1212                         XLogRecData rdata[1];
1213
1214                         xlrec.db_id = db_id;
1215                         xlrec.tablespace_id = dsttablespace;
1216
1217                         rdata[0].data = (char *) &xlrec;
1218                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1219                         rdata[0].buffer = InvalidBuffer;
1220                         rdata[0].next = NULL;
1221
1222                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1223                 }
1224
1225                 pfree(dstpath);
1226         }
1227
1228         heap_endscan(scan);
1229         heap_close(rel, AccessShareLock);
1230 }
1231
1232
1233 /*
1234  * get_database_oid - given a database name, look up the OID
1235  *
1236  * Returns InvalidOid if database name not found.
1237  *
1238  * This is not actually used in this file, but is exported for use elsewhere.
1239  */
1240 Oid
1241 get_database_oid(const char *dbname)
1242 {
1243         Relation        pg_database;
1244         ScanKeyData entry[1];
1245         SysScanDesc scan;
1246         HeapTuple       dbtuple;
1247         Oid                     oid;
1248
1249         /*
1250          * There's no syscache for pg_database indexed by name,
1251          * so we must look the hard way.
1252          */
1253         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1254         ScanKeyInit(&entry[0],
1255                                 Anum_pg_database_datname,
1256                                 BTEqualStrategyNumber, F_NAMEEQ,
1257                                 CStringGetDatum(dbname));
1258         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1259                                                           SnapshotNow, 1, entry);
1260
1261         dbtuple = systable_getnext(scan);
1262
1263         /* We assume that there can be at most one matching tuple */
1264         if (HeapTupleIsValid(dbtuple))
1265                 oid = HeapTupleGetOid(dbtuple);
1266         else
1267                 oid = InvalidOid;
1268
1269         systable_endscan(scan);
1270         heap_close(pg_database, AccessShareLock);
1271
1272         return oid;
1273 }
1274
1275
1276 /*
1277  * get_database_name - given a database OID, look up the name
1278  *
1279  * Returns a palloc'd string, or NULL if no such database.
1280  *
1281  * This is not actually used in this file, but is exported for use elsewhere.
1282  */
1283 char *
1284 get_database_name(Oid dbid)
1285 {
1286         HeapTuple       dbtuple;
1287         char       *result;
1288
1289         dbtuple = SearchSysCache(DATABASEOID,
1290                                                          ObjectIdGetDatum(dbid),
1291                                                          0, 0, 0);
1292         if (HeapTupleIsValid(dbtuple))
1293         {
1294                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1295                 ReleaseSysCache(dbtuple);
1296         }
1297         else
1298                 result = NULL;
1299
1300         return result;
1301 }
1302
1303 /*
1304  * DATABASE resource manager's routines
1305  */
1306 void
1307 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1308 {
1309         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1310
1311         if (info == XLOG_DBASE_CREATE)
1312         {
1313                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1314                 char       *src_path;
1315                 char       *dst_path;
1316                 struct stat st;
1317
1318                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1319                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1320
1321                 /*
1322                  * Our theory for replaying a CREATE is to forcibly drop the target
1323                  * subdirectory if present, then re-copy the source data. This may be
1324                  * more work than needed, but it is simple to implement.
1325                  */
1326                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1327                 {
1328                         if (!rmtree(dst_path, true))
1329                                 ereport(WARNING,
1330                                                 (errmsg("could not remove database directory \"%s\"",
1331                                                                 dst_path)));
1332                 }
1333
1334                 /*
1335                  * Force dirty buffers out to disk, to ensure source database is
1336                  * up-to-date for the copy.  (We really only need to flush buffers for
1337                  * the source database, but bufmgr.c provides no API for that.)
1338                  */
1339                 BufferSync();
1340
1341                 /*
1342                  * Copy this subdirectory to the new location
1343                  *
1344                  * We don't need to copy subdirectories
1345                  */
1346                 copydir(src_path, dst_path, false);
1347         }
1348         else if (info == XLOG_DBASE_DROP)
1349         {
1350                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1351                 char       *dst_path;
1352
1353                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1354
1355                 /* Drop pages for this database that are in the shared buffer cache */
1356                 DropDatabaseBuffers(xlrec->db_id);
1357
1358                 /* Also, clean out any entries in the shared free space map */
1359                 FreeSpaceMapForgetDatabase(xlrec->db_id);
1360
1361                 /* Clean out the xlog relcache too */
1362                 XLogDropDatabase(xlrec->db_id);
1363
1364                 /* And remove the physical files */
1365                 if (!rmtree(dst_path, true))
1366                         ereport(WARNING,
1367                                         (errmsg("could not remove database directory \"%s\"",
1368                                                         dst_path)));
1369         }
1370         else
1371                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1372 }
1373
1374 void
1375 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
1376 {
1377         uint8           info = xl_info & ~XLR_INFO_MASK;
1378
1379         if (info == XLOG_DBASE_CREATE)
1380         {
1381                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1382
1383                 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
1384                                 xlrec->src_db_id, xlrec->src_tablespace_id,
1385                                 xlrec->db_id, xlrec->tablespace_id);
1386         }
1387         else if (info == XLOG_DBASE_DROP)
1388         {
1389                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1390
1391                 appendStringInfo(buf, "drop db: dir %u/%u",
1392                                 xlrec->db_id, xlrec->tablespace_id);
1393         }
1394         else
1395                 appendStringInfo(buf, "UNKNOWN");
1396 }