]> granicus.if.org Git - postgresql/blob - src/backend/commands/dbcommands.c
Improve COPY syntax to use WITH clause, keep backward compatibility.
[postgresql] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  *
7  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.94 2002/06/20 16:00:43 momjian Exp $
13  *
14  *-------------------------------------------------------------------------
15  */
16 #include "postgres.h"
17
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23
24 #include "access/heapam.h"
25 #include "catalog/catname.h"
26 #include "catalog/catalog.h"
27 #include "catalog/pg_database.h"
28 #include "catalog/pg_shadow.h"
29 #include "catalog/indexing.h"
30 #include "commands/comment.h"
31 #include "commands/dbcommands.h"
32 #include "miscadmin.h"
33 #include "storage/freespace.h"
34 #include "storage/sinval.h"
35 #include "utils/array.h"
36 #include "utils/builtins.h"
37 #include "utils/fmgroids.h"
38 #include "utils/guc.h"
39 #include "utils/lsyscache.h"
40 #include "utils/syscache.h"
41
42 #ifdef MULTIBYTE
43 #include "mb/pg_wchar.h"                /* encoding check */
44 #endif
45
46
47 /* non-export function prototypes */
48 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
49                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
50                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
51                         char *dbpath);
52 static bool have_createdb_privilege(void);
53 static char *resolve_alt_dbpath(const char *dbpath, Oid dboid);
54 static bool remove_dbdirs(const char *real_loc, const char *altloc);
55
56 /*
57  * CREATE DATABASE
58  */
59
60 void
61 createdb(const CreatedbStmt *stmt)
62 {
63         char       *nominal_loc;
64         char       *alt_loc;
65         char       *target_dir;
66         char            src_loc[MAXPGPATH];
67         char            buf[2 * MAXPGPATH + 100];
68         Oid                     src_dboid;
69         int4            src_owner;
70         int                     src_encoding;
71         bool            src_istemplate;
72         Oid                     src_lastsysoid;
73         TransactionId src_vacuumxid;
74         TransactionId src_frozenxid;
75         char            src_dbpath[MAXPGPATH];
76         Relation        pg_database_rel;
77         HeapTuple       tuple;
78         TupleDesc       pg_database_dsc;
79         Datum           new_record[Natts_pg_database];
80         char            new_record_nulls[Natts_pg_database];
81         Oid                     dboid;
82         int32           datdba;
83         List       *option;
84         DefElem    *downer = NULL;
85         DefElem    *dpath = NULL;
86         DefElem    *dtemplate = NULL;
87         DefElem    *dencoding = NULL;
88         char       *dbname = stmt->dbname;
89         char       *dbowner = NULL;
90         char       *dbpath = NULL;
91         char       *dbtemplate = NULL;
92         int                 encoding = -1;
93
94         /* Extract options from the statement node tree */
95         foreach(option, stmt->options)
96         {
97                 DefElem    *defel = (DefElem *) lfirst(option);
98
99                 if (strcmp(defel->defname, "owner") == 0)
100                 {
101                         if (downer)
102                                 elog(ERROR, "CREATE DATABASE: conflicting options");
103                         downer = defel;
104                 }
105                 else if (strcmp(defel->defname, "location") == 0)
106                 {
107                         if (dpath)
108                                 elog(ERROR, "CREATE DATABASE: conflicting options");
109                         dpath = defel;
110                 }
111                 else if (strcmp(defel->defname, "template") == 0)
112                 {
113                         if (dtemplate)
114                                 elog(ERROR, "CREATE DATABASE: conflicting options");
115                         dtemplate = defel;
116                 }
117                 else if (strcmp(defel->defname, "encoding") == 0)
118                 {
119                         if (dencoding)
120                                 elog(ERROR, "CREATE DATABASE: conflicting options");
121                         dencoding = defel;
122                 }
123                 else
124                         elog(ERROR, "CREATE DATABASE: option \"%s\" not recognized",
125                                  defel->defname);
126         }
127
128         if (downer)
129                 dbowner = strVal(downer->arg);
130         if (dpath)
131                 dbpath = strVal(dpath->arg);
132         if (dtemplate)
133                 dbtemplate = strVal(dtemplate->arg);
134         if (dencoding)
135                 encoding = intVal(dencoding->arg);
136
137         /* obtain sysid of proposed owner */
138         if (dbowner)
139                 datdba = get_usesysid(dbowner); /* will elog if no such user */
140         else
141                 datdba = GetUserId();
142
143         if (datdba == (int32) GetUserId())
144         {
145                 /* creating database for self: can be superuser or createdb */
146                 if (!superuser() && !have_createdb_privilege())
147                         elog(ERROR, "CREATE DATABASE: permission denied");
148         }
149         else
150         {
151                 /* creating database for someone else: must be superuser */
152                 /* note that the someone else need not have any permissions */
153                 if (!superuser())
154                         elog(ERROR, "CREATE DATABASE: permission denied");
155         }
156
157         /* don't call this in a transaction block */
158         if (IsTransactionBlock())
159                 elog(ERROR, "CREATE DATABASE: may not be called in a transaction block");
160
161         /*
162          * Check for db name conflict.  There is a race condition here, since
163          * another backend could create the same DB name before we commit.
164          * However, holding an exclusive lock on pg_database for the whole
165          * time we are copying the source database doesn't seem like a good
166          * idea, so accept possibility of race to create.  We will check again
167          * after we grab the exclusive lock.
168          */
169         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
170                 elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
171
172         /*
173          * Lookup database (template) to be cloned.
174          */
175         if (!dbtemplate)
176                 dbtemplate = "template1";               /* Default template database name */
177
178         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
179                                          &src_istemplate, &src_lastsysoid,
180                                          &src_vacuumxid, &src_frozenxid,
181                                          src_dbpath))
182                 elog(ERROR, "CREATE DATABASE: template \"%s\" does not exist",
183                          dbtemplate);
184
185         /*
186          * Permission check: to copy a DB that's not marked datistemplate, you
187          * must be superuser or the owner thereof.
188          */
189         if (!src_istemplate)
190         {
191                 if (!superuser() && GetUserId() != src_owner )
192                         elog(ERROR, "CREATE DATABASE: permission to copy \"%s\" denied",
193                                  dbtemplate);
194         }
195
196         /*
197          * Determine physical path of source database
198          */
199         alt_loc = resolve_alt_dbpath(src_dbpath, src_dboid);
200         if (!alt_loc)
201                 alt_loc = GetDatabasePath(src_dboid);
202         strcpy(src_loc, alt_loc);
203
204         /*
205          * The source DB can't have any active backends, except this one
206          * (exception is to allow CREATE DB while connected to template1).
207          * Otherwise we might copy inconsistent data.  This check is not
208          * bulletproof, since someone might connect while we are copying...
209          */
210         if (DatabaseHasActiveBackends(src_dboid, true))
211                 elog(ERROR, "CREATE DATABASE: source database \"%s\" is being accessed by other users", dbtemplate);
212
213         /* If encoding is defaulted, use source's encoding */
214         if (encoding < 0)
215                 encoding = src_encoding;
216
217 #ifdef MULTIBYTE
218         /* Some encodings are client only */
219         if (!PG_VALID_BE_ENCODING(encoding))
220                 elog(ERROR, "CREATE DATABASE: invalid backend encoding");
221 #else
222         Assert(encoding == 0);          /* zero is PG_SQL_ASCII */
223 #endif
224
225         /*
226          * Preassign OID for pg_database tuple, so that we can compute db
227          * path.
228          */
229         dboid = newoid();
230
231         /*
232          * Compute nominal location (where we will try to access the
233          * database), and resolve alternate physical location if one is
234          * specified.
235          *
236          * If an alternate location is specified but is the same as the
237          * normal path, just drop the alternate-location spec (this seems
238          * friendlier than erroring out).  We must test this case to avoid
239          * creating a circular symlink below.
240          */
241         nominal_loc = GetDatabasePath(dboid);
242         alt_loc = resolve_alt_dbpath(dbpath, dboid);
243
244         if (alt_loc && strcmp(alt_loc, nominal_loc) == 0)
245         {
246                 alt_loc = NULL;
247                 dbpath = NULL;
248         }
249
250         if (strchr(nominal_loc, '\''))
251                 elog(ERROR, "database path may not contain single quotes");
252         if (alt_loc && strchr(alt_loc, '\''))
253                 elog(ERROR, "database path may not contain single quotes");
254         if (strchr(src_loc, '\''))
255                 elog(ERROR, "database path may not contain single quotes");
256         /* ... otherwise we'd be open to shell exploits below */
257
258         /*
259          * Force dirty buffers out to disk, to ensure source database is
260          * up-to-date for the copy.  (We really only need to flush buffers for
261          * the source database...)
262          */
263         BufferSync();
264
265         /*
266          * Close virtual file descriptors so the kernel has more available for
267          * the mkdir() and system() calls below.
268          */
269         closeAllVfds();
270
271         /*
272          * Check we can create the target directory --- but then remove it
273          * because we rely on cp(1) to create it for real.
274          */
275         target_dir = alt_loc ? alt_loc : nominal_loc;
276
277         if (mkdir(target_dir, S_IRWXU) != 0)
278                 elog(ERROR, "CREATE DATABASE: unable to create database directory '%s': %m",
279                          target_dir);
280         if (rmdir(target_dir) != 0)
281                 elog(ERROR, "CREATE DATABASE: unable to remove temp directory '%s': %m",
282                          target_dir);
283
284         /* Make the symlink, if needed */
285         if (alt_loc)
286         {
287                 if (symlink(alt_loc, nominal_loc) != 0)
288                         elog(ERROR, "CREATE DATABASE: could not link '%s' to '%s': %m",
289                                  nominal_loc, alt_loc);
290         }
291
292         /* Copy the template database to the new location */
293         snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir);
294
295         if (system(buf) != 0)
296         {
297                 if (remove_dbdirs(nominal_loc, alt_loc))
298                         elog(ERROR, "CREATE DATABASE: could not initialize database directory");
299                 else
300                         elog(ERROR, "CREATE DATABASE: could not initialize database directory; delete failed as well");
301         }
302
303         /*
304          * Now OK to grab exclusive lock on pg_database.
305          */
306         pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
307
308         /* Check to see if someone else created same DB name meanwhile. */
309         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
310         {
311                 /* Don't hold lock while doing recursive remove */
312                 heap_close(pg_database_rel, AccessExclusiveLock);
313                 remove_dbdirs(nominal_loc, alt_loc);
314                 elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
315         }
316
317         /*
318          * Insert a new tuple into pg_database
319          */
320         pg_database_dsc = RelationGetDescr(pg_database_rel);
321
322         /* Form tuple */
323         MemSet(new_record, 0, sizeof(new_record));
324         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
325
326         new_record[Anum_pg_database_datname - 1] =
327                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
328         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
329         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
330         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
331         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
332         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
333         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
334         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
335         /* do not set datpath to null, GetRawDatabaseInfo won't cope */
336         new_record[Anum_pg_database_datpath - 1] =
337                 DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : ""));
338
339         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
340         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
341
342         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
343
344         tuple->t_data->t_oid = dboid;           /* override heap_insert's OID
345                                                                                  * selection */
346
347         simple_heap_insert(pg_database_rel, tuple);
348
349         /*
350          * Update indexes
351          */
352         if (RelationGetForm(pg_database_rel)->relhasindex)
353         {
354                 Relation        idescs[Num_pg_database_indices];
355
356                 CatalogOpenIndices(Num_pg_database_indices,
357                                                    Name_pg_database_indices, idescs);
358                 CatalogIndexInsert(idescs, Num_pg_database_indices, pg_database_rel,
359                                                    tuple);
360                 CatalogCloseIndices(Num_pg_database_indices, idescs);
361         }
362
363         /* Close pg_database, but keep lock till commit */
364         heap_close(pg_database_rel, NoLock);
365
366         /*
367          * Force dirty buffers out to disk, so that newly-connecting backends
368          * will see the new database in pg_database right away.  (They'll see
369          * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
370          */
371         BufferSync();
372 }
373
374
375 /*
376  * DROP DATABASE
377  */
378 void
379 dropdb(const char *dbname)
380 {
381         int4            db_owner;
382         bool            db_istemplate;
383         Oid                     db_id;
384         char       *alt_loc;
385         char       *nominal_loc;
386         char            dbpath[MAXPGPATH];
387         Relation        pgdbrel;
388         HeapScanDesc pgdbscan;
389         ScanKeyData key;
390         HeapTuple       tup;
391
392         AssertArg(dbname);
393
394         if (strcmp(dbname, DatabaseName) == 0)
395                 elog(ERROR, "DROP DATABASE: cannot be executed on the currently open database");
396
397         if (IsTransactionBlock())
398                 elog(ERROR, "DROP DATABASE: may not be called in a transaction block");
399
400         /*
401          * Obtain exclusive lock on pg_database.  We need this to ensure that
402          * no new backend starts up in the target database while we are
403          * deleting it.  (Actually, a new backend might still manage to start
404          * up, because it will read pg_database without any locking to
405          * discover the database's OID.  But it will detect its error in
406          * ReverifyMyDatabase and shut down before any serious damage is done.
407          * See postinit.c.)
408          */
409         pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
410
411         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
412                                          &db_istemplate, NULL, NULL, NULL, dbpath))
413                 elog(ERROR, "DROP DATABASE: database \"%s\" does not exist", dbname);
414
415         if (GetUserId() != db_owner && !superuser())
416                 elog(ERROR, "DROP DATABASE: permission denied");
417
418         /*
419          * Disallow dropping a DB that is marked istemplate.  This is just to
420          * prevent people from accidentally dropping template0 or template1;
421          * they can do so if they're really determined ...
422          */
423         if (db_istemplate)
424                 elog(ERROR, "DROP DATABASE: database is marked as a template");
425
426         nominal_loc = GetDatabasePath(db_id);
427         alt_loc = resolve_alt_dbpath(dbpath, db_id);
428
429         /*
430          * Check for active backends in the target database.
431          */
432         if (DatabaseHasActiveBackends(db_id, false))
433                 elog(ERROR, "DROP DATABASE: database \"%s\" is being accessed by other users", dbname);
434
435         /*
436          * Find the database's tuple by OID (should be unique).
437          */
438         ScanKeyEntryInitialize(&key, 0, ObjectIdAttributeNumber,
439                                                    F_OIDEQ, ObjectIdGetDatum(db_id));
440
441         pgdbscan = heap_beginscan(pgdbrel, SnapshotNow, 1, &key);
442
443         tup = heap_getnext(pgdbscan, ForwardScanDirection);
444         if (!HeapTupleIsValid(tup))
445         {
446                 /*
447                  * This error should never come up since the existence of the
448                  * database is checked earlier
449                  */
450                 elog(ERROR, "DROP DATABASE: Database \"%s\" doesn't exist despite earlier reports to the contrary",
451                          dbname);
452         }
453
454         /* Remove the database's tuple from pg_database */
455         simple_heap_delete(pgdbrel, &tup->t_self);
456
457         heap_endscan(pgdbscan);
458
459         /* Delete any comments associated with the database */
460         DeleteComments(db_id, RelationGetRelid(pgdbrel));
461
462         /*
463          * Close pg_database, but keep exclusive lock till commit to ensure
464          * that any new backend scanning pg_database will see the tuple dead.
465          */
466         heap_close(pgdbrel, NoLock);
467
468         /*
469          * Drop pages for this database that are in the shared buffer cache.
470          * This is important to ensure that no remaining backend tries to
471          * write out a dirty buffer to the dead database later...
472          */
473         DropBuffers(db_id);
474
475         /*
476          * Also, clean out any entries in the shared free space map.
477          */
478         FreeSpaceMapForgetDatabase(db_id);
479
480         /*
481          * Remove the database's subdirectory and everything in it.
482          */
483         remove_dbdirs(nominal_loc, alt_loc);
484
485         /*
486          * Force dirty buffers out to disk, so that newly-connecting backends
487          * will see the database tuple marked dead in pg_database right away.
488          * (They'll see an uncommitted deletion, but they don't care; see
489          * GetRawDatabaseInfo.)
490          */
491         BufferSync();
492 }
493
494
495
496 /*
497  * ALTER DATABASE name SET ...
498  */
499 void
500 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
501 {
502         char       *valuestr;
503         HeapTuple       tuple,
504                                 newtuple;
505         Relation        rel;
506         ScanKeyData     scankey;
507         HeapScanDesc scan;
508         Datum           repl_val[Natts_pg_database];
509         char            repl_null[Natts_pg_database];
510         char            repl_repl[Natts_pg_database];
511
512         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
513
514         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
515         ScanKeyEntryInitialize(&scankey, 0, Anum_pg_database_datname,
516                                                    F_NAMEEQ, NameGetDatum(stmt->dbname));
517         scan = heap_beginscan(rel, SnapshotNow, 1, &scankey);
518         tuple = heap_getnext(scan, ForwardScanDirection);
519         if (!HeapTupleIsValid(tuple))
520                 elog(ERROR, "database \"%s\" does not exist", stmt->dbname);
521
522         if (!(superuser()
523                   || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
524                 elog(ERROR, "permission denied");
525
526         MemSet(repl_repl, ' ', sizeof(repl_repl));
527         repl_repl[Anum_pg_database_datconfig-1] = 'r';
528
529         if (strcmp(stmt->variable, "all")==0 && valuestr == NULL)
530         {
531                 /* RESET ALL */
532                 repl_null[Anum_pg_database_datconfig-1] = 'n';
533                 repl_val[Anum_pg_database_datconfig-1] = (Datum) 0;
534         }
535         else
536         {
537                 Datum datum;
538                 bool isnull;
539                 ArrayType *a;
540
541                 repl_null[Anum_pg_database_datconfig-1] = ' ';
542
543                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
544                                                          RelationGetDescr(rel), &isnull);
545
546                 a = isnull ? ((ArrayType *) NULL) : DatumGetArrayTypeP(datum);
547
548                 if (valuestr)
549                         a = GUCArrayAdd(a, stmt->variable, valuestr);
550                 else
551                         a = GUCArrayDelete(a, stmt->variable);
552
553                 repl_val[Anum_pg_database_datconfig-1] = PointerGetDatum(a);
554         }
555
556         newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
557         simple_heap_update(rel, &tuple->t_self, newtuple);
558
559         /*
560          * Update indexes
561          */
562         if (RelationGetForm(rel)->relhasindex)
563         {
564                 Relation        idescs[Num_pg_database_indices];
565
566                 CatalogOpenIndices(Num_pg_database_indices,
567                                                    Name_pg_database_indices, idescs);
568                 CatalogIndexInsert(idescs, Num_pg_database_indices, rel,
569                                                    newtuple);
570                 CatalogCloseIndices(Num_pg_database_indices, idescs);
571         }
572
573         heap_endscan(scan);
574         heap_close(rel, RowExclusiveLock);
575 }
576
577
578
579 /*
580  * Helper functions
581  */
582
583 static bool
584 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
585                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
586                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
587                         char *dbpath)
588 {
589         Relation        relation;
590         ScanKeyData scanKey;
591         HeapScanDesc scan;
592         HeapTuple       tuple;
593         bool            gottuple;
594
595         AssertArg(name);
596
597         /* Caller may wish to grab a better lock on pg_database beforehand... */
598         relation = heap_openr(DatabaseRelationName, AccessShareLock);
599
600         ScanKeyEntryInitialize(&scanKey, 0, Anum_pg_database_datname,
601                                                    F_NAMEEQ, NameGetDatum(name));
602
603         scan = heap_beginscan(relation, SnapshotNow, 1, &scanKey);
604
605         tuple = heap_getnext(scan, ForwardScanDirection);
606
607         gottuple = HeapTupleIsValid(tuple);
608         if (gottuple)
609         {
610                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
611
612                 /* oid of the database */
613                 if (dbIdP)
614                         *dbIdP = tuple->t_data->t_oid;
615                 /* sysid of the owner */
616                 if (ownerIdP)
617                         *ownerIdP = dbform->datdba;
618                 /* multibyte encoding */
619                 if (encodingP)
620                         *encodingP = dbform->encoding;
621                 /* allowed as template? */
622                 if (dbIsTemplateP)
623                         *dbIsTemplateP = dbform->datistemplate;
624                 /* last system OID used in database */
625                 if (dbLastSysOidP)
626                         *dbLastSysOidP = dbform->datlastsysoid;
627                 /* limit of vacuumed XIDs */
628                 if (dbVacuumXidP)
629                         *dbVacuumXidP = dbform->datvacuumxid;
630                 /* limit of frozen XIDs */
631                 if (dbFrozenXidP)
632                         *dbFrozenXidP = dbform->datfrozenxid;
633                 /* database path (as registered in pg_database) */
634                 if (dbpath)
635                 {
636                         Datum           datum;
637                         bool            isnull;
638
639                         datum = heap_getattr(tuple,
640                                                                  Anum_pg_database_datpath,
641                                                                  RelationGetDescr(relation),
642                                                                  &isnull);
643                         if (!isnull)
644                         {
645                                 text       *pathtext = DatumGetTextP(datum);
646                                 int                     pathlen = VARSIZE(pathtext) - VARHDRSZ;
647
648                                 Assert(pathlen >= 0 && pathlen < MAXPGPATH);
649                                 strncpy(dbpath, VARDATA(pathtext), pathlen);
650                                 *(dbpath + pathlen) = '\0';
651                         }
652                         else
653                                 strcpy(dbpath, "");
654                 }
655         }
656
657         heap_endscan(scan);
658         heap_close(relation, AccessShareLock);
659
660         return gottuple;
661 }
662
663 static bool
664 have_createdb_privilege(void)
665 {
666         HeapTuple       utup;
667         bool            retval;
668
669         utup = SearchSysCache(SHADOWSYSID,
670                                                   ObjectIdGetDatum(GetUserId()),
671                                                   0, 0, 0);
672
673         if (!HeapTupleIsValid(utup))
674                 retval = false;
675         else
676                 retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
677
678         ReleaseSysCache(utup);
679
680         return retval;
681 }
682
683
684 static char *
685 resolve_alt_dbpath(const char *dbpath, Oid dboid)
686 {
687         const char *prefix;
688         char       *ret;
689         size_t          len;
690
691         if (dbpath == NULL || dbpath[0] == '\0')
692                 return NULL;
693
694         if (strchr(dbpath, '/'))
695         {
696                 if (dbpath[0] != '/')
697                         elog(ERROR, "Relative paths are not allowed as database locations");
698 #ifndef ALLOW_ABSOLUTE_DBPATHS
699                 elog(ERROR, "Absolute paths are not allowed as database locations");
700 #endif
701                 prefix = dbpath;
702         }
703         else
704         {
705                 /* must be environment variable */
706                 char       *var = getenv(dbpath);
707
708                 if (!var)
709                         elog(ERROR, "Postmaster environment variable '%s' not set", dbpath);
710                 if (var[0] != '/')
711                         elog(ERROR, "Postmaster environment variable '%s' must be absolute path", dbpath);
712                 prefix = var;
713         }
714
715         len = strlen(prefix) + 6 + sizeof(Oid) * 8 + 1;
716         if (len >= MAXPGPATH - 100)
717                 elog(ERROR, "Alternate path is too long");
718
719         ret = palloc(len);
720         snprintf(ret, len, "%s/base/%u", prefix, dboid);
721
722         return ret;
723 }
724
725
726 static bool
727 remove_dbdirs(const char *nominal_loc, const char *alt_loc)
728 {
729         const char *target_dir;
730         char            buf[MAXPGPATH + 100];
731         bool            success = true;
732
733         target_dir = alt_loc ? alt_loc : nominal_loc;
734
735         /*
736          * Close virtual file descriptors so the kernel has more available for
737          * the system() call below.
738          */
739         closeAllVfds();
740
741         if (alt_loc)
742         {
743                 /* remove symlink */
744                 if (unlink(nominal_loc) != 0)
745                 {
746                         elog(WARNING, "could not remove '%s': %m", nominal_loc);
747                         success = false;
748                 }
749         }
750
751         snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir);
752
753         if (system(buf) != 0)
754         {
755                 elog(WARNING, "database directory '%s' could not be removed",
756                          target_dir);
757                 success = false;
758         }
759
760         return success;
761 }