]> granicus.if.org Git - postgresql/blob - src/backend/commands/tablespace.c
Install a hopefully-temporary workaround for Snow Leopard readdir() bug.
[postgresql] / src / backend / commands / tablespace.c
1 /*-------------------------------------------------------------------------
2  *
3  * tablespace.c
4  *        Commands to manipulate table spaces
5  *
6  * Tablespaces in PostgreSQL are designed to allow users to determine
7  * where the data file(s) for a given database object reside on the file
8  * system.
9  *
10  * A tablespace represents a directory on the file system. At tablespace
11  * creation time, the directory must be empty. To simplify things and
12  * remove the possibility of having file name conflicts, we isolate
13  * files within a tablespace into database-specific subdirectories.
14  *
15  * To support file access via the information given in RelFileNode, we
16  * maintain a symbolic-link map in $PGDATA/pg_tblspc. The symlinks are
17  * named by tablespace OIDs and point to the actual tablespace directories.
18  * Thus the full path to an arbitrary file is
19  *                      $PGDATA/pg_tblspc/spcoid/dboid/relfilenode
20  *
21  * There are two tablespaces created at initdb time: pg_global (for shared
22  * tables) and pg_default (for everything else).  For backwards compatibility
23  * and to remain functional on platforms without symlinks, these tablespaces
24  * are accessed specially: they are respectively
25  *                      $PGDATA/global/relfilenode
26  *                      $PGDATA/base/dboid/relfilenode
27  *
28  * To allow CREATE DATABASE to give a new database a default tablespace
29  * that's different from the template database's default, we make the
30  * provision that a zero in pg_class.reltablespace means the database's
31  * default tablespace.  Without this, CREATE DATABASE would have to go in
32  * and munge the system catalogs of the new database.
33  *
34  *
35  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
36  * Portions Copyright (c) 1994, Regents of the University of California
37  *
38  *
39  * IDENTIFICATION
40  *        $PostgreSQL: pgsql/src/backend/commands/tablespace.c,v 1.62 2009/09/12 15:51:52 tgl Exp $
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45
46 #include <unistd.h>
47 #include <dirent.h>
48 #include <sys/types.h>
49 #include <sys/stat.h>
50
51 #include "access/heapam.h"
52 #include "access/sysattr.h"
53 #include "access/xact.h"
54 #include "catalog/catalog.h"
55 #include "catalog/dependency.h"
56 #include "catalog/indexing.h"
57 #include "catalog/pg_tablespace.h"
58 #include "commands/comment.h"
59 #include "commands/tablespace.h"
60 #include "miscadmin.h"
61 #include "postmaster/bgwriter.h"
62 #include "storage/fd.h"
63 #include "utils/acl.h"
64 #include "utils/builtins.h"
65 #include "utils/fmgroids.h"
66 #include "utils/guc.h"
67 #include "utils/lsyscache.h"
68 #include "utils/memutils.h"
69 #include "utils/rel.h"
70 #include "utils/tqual.h"
71
72
73 /* GUC variables */
74 char       *default_tablespace = NULL;
75 char       *temp_tablespaces = NULL;
76
77
78 static bool remove_tablespace_directories(Oid tablespaceoid, bool redo);
79 static void set_short_version(const char *path);
80
81
82 /*
83  * Each database using a table space is isolated into its own name space
84  * by a subdirectory named for the database OID.  On first creation of an
85  * object in the tablespace, create the subdirectory.  If the subdirectory
86  * already exists, just fall through quietly.
87  *
88  * isRedo indicates that we are creating an object during WAL replay.
89  * In this case we will cope with the possibility of the tablespace
90  * directory not being there either --- this could happen if we are
91  * replaying an operation on a table in a subsequently-dropped tablespace.
92  * We handle this by making a directory in the place where the tablespace
93  * symlink would normally be.  This isn't an exact replay of course, but
94  * it's the best we can do given the available information.
95  *
96  * If tablespaces are not supported, you might think this could be a no-op,
97  * but you'd be wrong: we still need it in case we have to re-create a
98  * database subdirectory (of $PGDATA/base) during WAL replay.
99  */
100 void
101 TablespaceCreateDbspace(Oid spcNode, Oid dbNode, bool isRedo)
102 {
103         struct stat st;
104         char       *dir;
105
106         /*
107          * The global tablespace doesn't have per-database subdirectories, so
108          * nothing to do for it.
109          */
110         if (spcNode == GLOBALTABLESPACE_OID)
111                 return;
112
113         Assert(OidIsValid(spcNode));
114         Assert(OidIsValid(dbNode));
115
116         dir = GetDatabasePath(dbNode, spcNode);
117
118         if (stat(dir, &st) < 0)
119         {
120                 if (errno == ENOENT)
121                 {
122                         /*
123                          * Acquire TablespaceCreateLock to ensure that no DROP TABLESPACE
124                          * or TablespaceCreateDbspace is running concurrently.
125                          */
126                         LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
127
128                         /*
129                          * Recheck to see if someone created the directory while we were
130                          * waiting for lock.
131                          */
132                         if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode))
133                         {
134                                 /* need not do anything */
135                         }
136                         else
137                         {
138                                 /* OK, go for it */
139                                 if (mkdir(dir, S_IRWXU) < 0)
140                                 {
141                                         char       *parentdir;
142
143                                         if (errno != ENOENT || !isRedo)
144                                                 ereport(ERROR,
145                                                                 (errcode_for_file_access(),
146                                                           errmsg("could not create directory \"%s\": %m",
147                                                                          dir)));
148                                         /* Try to make parent directory too */
149                                         parentdir = pstrdup(dir);
150                                         get_parent_directory(parentdir);
151                                         if (mkdir(parentdir, S_IRWXU) < 0)
152                                                 ereport(ERROR,
153                                                                 (errcode_for_file_access(),
154                                                           errmsg("could not create directory \"%s\": %m",
155                                                                          parentdir)));
156                                         pfree(parentdir);
157                                         if (mkdir(dir, S_IRWXU) < 0)
158                                                 ereport(ERROR,
159                                                                 (errcode_for_file_access(),
160                                                           errmsg("could not create directory \"%s\": %m",
161                                                                          dir)));
162                                 }
163                         }
164
165                         LWLockRelease(TablespaceCreateLock);
166                 }
167                 else
168                 {
169                         ereport(ERROR,
170                                         (errcode_for_file_access(),
171                                          errmsg("could not stat directory \"%s\": %m", dir)));
172                 }
173         }
174         else
175         {
176                 /* be paranoid */
177                 if (!S_ISDIR(st.st_mode))
178                         ereport(ERROR,
179                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
180                                          errmsg("\"%s\" exists but is not a directory",
181                                                         dir)));
182         }
183
184         pfree(dir);
185 }
186
187 /*
188  * Create a table space
189  *
190  * Only superusers can create a tablespace. This seems a reasonable restriction
191  * since we're determining the system layout and, anyway, we probably have
192  * root if we're doing this kind of activity
193  */
194 void
195 CreateTableSpace(CreateTableSpaceStmt *stmt)
196 {
197 #ifdef HAVE_SYMLINK
198         Relation        rel;
199         Datum           values[Natts_pg_tablespace];
200         bool            nulls[Natts_pg_tablespace];
201         HeapTuple       tuple;
202         Oid                     tablespaceoid;
203         char       *location;
204         char       *linkloc;
205         Oid                     ownerId;
206
207         /* Must be super user */
208         if (!superuser())
209                 ereport(ERROR,
210                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
211                                  errmsg("permission denied to create tablespace \"%s\"",
212                                                 stmt->tablespacename),
213                                  errhint("Must be superuser to create a tablespace.")));
214
215         /* However, the eventual owner of the tablespace need not be */
216         if (stmt->owner)
217                 ownerId = get_roleid_checked(stmt->owner);
218         else
219                 ownerId = GetUserId();
220
221         /* Unix-ify the offered path, and strip any trailing slashes */
222         location = pstrdup(stmt->location);
223         canonicalize_path(location);
224
225         /* disallow quotes, else CREATE DATABASE would be at risk */
226         if (strchr(location, '\''))
227                 ereport(ERROR,
228                                 (errcode(ERRCODE_INVALID_NAME),
229                                  errmsg("tablespace location cannot contain single quotes")));
230
231         /*
232          * Allowing relative paths seems risky
233          *
234          * this also helps us ensure that location is not empty or whitespace
235          */
236         if (!is_absolute_path(location))
237                 ereport(ERROR,
238                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
239                                  errmsg("tablespace location must be an absolute path")));
240
241         /*
242          * Check that location isn't too long. Remember that we're going to append
243          * '/<dboid>/<relid>.<nnn>'  (XXX but do we ever form the whole path
244          * explicitly?  This may be overly conservative.)
245          */
246         if (strlen(location) >= (MAXPGPATH - 1 - 10 - 1 - 10 - 1 - 10))
247                 ereport(ERROR,
248                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
249                                  errmsg("tablespace location \"%s\" is too long",
250                                                 location)));
251
252         /*
253          * Disallow creation of tablespaces named "pg_xxx"; we reserve this
254          * namespace for system purposes.
255          */
256         if (!allowSystemTableMods && IsReservedName(stmt->tablespacename))
257                 ereport(ERROR,
258                                 (errcode(ERRCODE_RESERVED_NAME),
259                                  errmsg("unacceptable tablespace name \"%s\"",
260                                                 stmt->tablespacename),
261                 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
262
263         /*
264          * Check that there is no other tablespace by this name.  (The unique
265          * index would catch this anyway, but might as well give a friendlier
266          * message.)
267          */
268         if (OidIsValid(get_tablespace_oid(stmt->tablespacename)))
269                 ereport(ERROR,
270                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
271                                  errmsg("tablespace \"%s\" already exists",
272                                                 stmt->tablespacename)));
273
274         /*
275          * Insert tuple into pg_tablespace.  The purpose of doing this first is to
276          * lock the proposed tablename against other would-be creators. The
277          * insertion will roll back if we find problems below.
278          */
279         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
280
281         MemSet(nulls, false, sizeof(nulls));
282
283         values[Anum_pg_tablespace_spcname - 1] =
284                 DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename));
285         values[Anum_pg_tablespace_spcowner - 1] =
286                 ObjectIdGetDatum(ownerId);
287         values[Anum_pg_tablespace_spclocation - 1] =
288                 CStringGetTextDatum(location);
289         nulls[Anum_pg_tablespace_spcacl - 1] = true;
290
291         tuple = heap_form_tuple(rel->rd_att, values, nulls);
292
293         tablespaceoid = simple_heap_insert(rel, tuple);
294
295         CatalogUpdateIndexes(rel, tuple);
296
297         heap_freetuple(tuple);
298
299         /* Record dependency on owner */
300         recordDependencyOnOwner(TableSpaceRelationId, tablespaceoid, ownerId);
301
302         /*
303          * Attempt to coerce target directory to safe permissions.      If this fails,
304          * it doesn't exist or has the wrong owner.
305          */
306         if (chmod(location, 0700) != 0)
307                 ereport(ERROR,
308                                 (errcode_for_file_access(),
309                                  errmsg("could not set permissions on directory \"%s\": %m",
310                                                 location)));
311
312         /*
313          * Check the target directory is empty.
314          */
315         if (!directory_is_empty(location))
316                 ereport(ERROR,
317                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
318                                  errmsg("directory \"%s\" is not empty",
319                                                 location)));
320
321         /*
322          * Create the PG_VERSION file in the target directory.  This has several
323          * purposes: to make sure we can write in the directory, to prevent
324          * someone from creating another tablespace pointing at the same directory
325          * (the emptiness check above will fail), and to label tablespace
326          * directories by PG version.
327          */
328         set_short_version(location);
329
330         /*
331          * All seems well, create the symlink
332          */
333         linkloc = (char *) palloc(10 + 10 + 1);
334         sprintf(linkloc, "pg_tblspc/%u", tablespaceoid);
335
336         if (symlink(location, linkloc) < 0)
337                 ereport(ERROR,
338                                 (errcode_for_file_access(),
339                                  errmsg("could not create symbolic link \"%s\": %m",
340                                                 linkloc)));
341
342         /* Record the filesystem change in XLOG */
343         {
344                 xl_tblspc_create_rec xlrec;
345                 XLogRecData rdata[2];
346
347                 xlrec.ts_id = tablespaceoid;
348                 rdata[0].data = (char *) &xlrec;
349                 rdata[0].len = offsetof(xl_tblspc_create_rec, ts_path);
350                 rdata[0].buffer = InvalidBuffer;
351                 rdata[0].next = &(rdata[1]);
352
353                 rdata[1].data = (char *) location;
354                 rdata[1].len = strlen(location) + 1;
355                 rdata[1].buffer = InvalidBuffer;
356                 rdata[1].next = NULL;
357
358                 (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_CREATE, rdata);
359         }
360
361         /*
362          * Force synchronous commit, to minimize the window between creating the
363          * symlink on-disk and marking the transaction committed.  It's not great
364          * that there is any window at all, but definitely we don't want to make
365          * it larger than necessary.
366          */
367         ForceSyncCommit();
368
369         pfree(linkloc);
370         pfree(location);
371
372         /* We keep the lock on pg_tablespace until commit */
373         heap_close(rel, NoLock);
374 #else                                                   /* !HAVE_SYMLINK */
375         ereport(ERROR,
376                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
377                          errmsg("tablespaces are not supported on this platform")));
378 #endif   /* HAVE_SYMLINK */
379 }
380
381 /*
382  * Drop a table space
383  *
384  * Be careful to check that the tablespace is empty.
385  */
386 void
387 DropTableSpace(DropTableSpaceStmt *stmt)
388 {
389 #ifdef HAVE_SYMLINK
390         char       *tablespacename = stmt->tablespacename;
391         HeapScanDesc scandesc;
392         Relation        rel;
393         HeapTuple       tuple;
394         ScanKeyData entry[1];
395         Oid                     tablespaceoid;
396
397         /*
398          * Find the target tuple
399          */
400         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
401
402         ScanKeyInit(&entry[0],
403                                 Anum_pg_tablespace_spcname,
404                                 BTEqualStrategyNumber, F_NAMEEQ,
405                                 CStringGetDatum(tablespacename));
406         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
407         tuple = heap_getnext(scandesc, ForwardScanDirection);
408
409         if (!HeapTupleIsValid(tuple))
410         {
411                 if (!stmt->missing_ok)
412                 {
413                         ereport(ERROR,
414                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
415                                          errmsg("tablespace \"%s\" does not exist",
416                                                         tablespacename)));
417                 }
418                 else
419                 {
420                         ereport(NOTICE,
421                                         (errmsg("tablespace \"%s\" does not exist, skipping",
422                                                         tablespacename)));
423                         /* XXX I assume I need one or both of these next two calls */
424                         heap_endscan(scandesc);
425                         heap_close(rel, NoLock);
426                 }
427                 return;
428         }
429
430         tablespaceoid = HeapTupleGetOid(tuple);
431
432         /* Must be tablespace owner */
433         if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
434                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
435                                            tablespacename);
436
437         /* Disallow drop of the standard tablespaces, even by superuser */
438         if (tablespaceoid == GLOBALTABLESPACE_OID ||
439                 tablespaceoid == DEFAULTTABLESPACE_OID)
440                 aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE,
441                                            tablespacename);
442
443         /*
444          * Remove the pg_tablespace tuple (this will roll back if we fail below)
445          */
446         simple_heap_delete(rel, &tuple->t_self);
447
448         heap_endscan(scandesc);
449
450         /*
451          * Remove any comments on this tablespace.
452          */
453         DeleteSharedComments(tablespaceoid, TableSpaceRelationId);
454
455         /*
456          * Remove dependency on owner.
457          */
458         deleteSharedDependencyRecordsFor(TableSpaceRelationId, tablespaceoid, 0);
459
460         /*
461          * Acquire TablespaceCreateLock to ensure that no TablespaceCreateDbspace
462          * is running concurrently.
463          */
464         LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
465
466         /*
467          * Try to remove the physical infrastructure.
468          */
469         if (!remove_tablespace_directories(tablespaceoid, false))
470         {
471                 /*
472                  * Not all files deleted?  However, there can be lingering empty files
473                  * in the directories, left behind by for example DROP TABLE, that
474                  * have been scheduled for deletion at next checkpoint (see comments
475                  * in mdunlink() for details).  We could just delete them immediately,
476                  * but we can't tell them apart from important data files that we
477                  * mustn't delete.  So instead, we force a checkpoint which will clean
478                  * out any lingering files, and try again.
479                  */
480                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
481                 if (!remove_tablespace_directories(tablespaceoid, false))
482                 {
483                         /* Still not empty, the files must be important then */
484                         ereport(ERROR,
485                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
486                                          errmsg("tablespace \"%s\" is not empty",
487                                                         tablespacename)));
488                 }
489         }
490
491         /* Record the filesystem change in XLOG */
492         {
493                 xl_tblspc_drop_rec xlrec;
494                 XLogRecData rdata[1];
495
496                 xlrec.ts_id = tablespaceoid;
497                 rdata[0].data = (char *) &xlrec;
498                 rdata[0].len = sizeof(xl_tblspc_drop_rec);
499                 rdata[0].buffer = InvalidBuffer;
500                 rdata[0].next = NULL;
501
502                 (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP, rdata);
503         }
504
505         /*
506          * Note: because we checked that the tablespace was empty, there should be
507          * no need to worry about flushing shared buffers or free space map
508          * entries for relations in the tablespace.
509          */
510
511         /*
512          * Force synchronous commit, to minimize the window between removing the
513          * files on-disk and marking the transaction committed.  It's not great
514          * that there is any window at all, but definitely we don't want to make
515          * it larger than necessary.
516          */
517         ForceSyncCommit();
518
519         /*
520          * Allow TablespaceCreateDbspace again.
521          */
522         LWLockRelease(TablespaceCreateLock);
523
524         /* We keep the lock on pg_tablespace until commit */
525         heap_close(rel, NoLock);
526 #else                                                   /* !HAVE_SYMLINK */
527         ereport(ERROR,
528                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
529                          errmsg("tablespaces are not supported on this platform")));
530 #endif   /* HAVE_SYMLINK */
531 }
532
533 /*
534  * remove_tablespace_directories: attempt to remove filesystem infrastructure
535  *
536  * Returns TRUE if successful, FALSE if some subdirectory is not empty
537  *
538  * redo indicates we are redoing a drop from XLOG; okay if nothing there
539  */
540 static bool
541 remove_tablespace_directories(Oid tablespaceoid, bool redo)
542 {
543         char       *location;
544         DIR                *dirdesc;
545         struct dirent *de;
546         char       *subfile;
547         struct stat st;
548
549         location = (char *) palloc(10 + 10 + 1);
550         sprintf(location, "pg_tblspc/%u", tablespaceoid);
551
552         /*
553          * Check if the tablespace still contains any files.  We try to rmdir each
554          * per-database directory we find in it.  rmdir failure implies there are
555          * still files in that subdirectory, so give up.  (We do not have to worry
556          * about undoing any already completed rmdirs, since the next attempt to
557          * use the tablespace from that database will simply recreate the
558          * subdirectory via TablespaceCreateDbspace.)
559          *
560          * Since we hold TablespaceCreateLock, no one else should be creating any
561          * fresh subdirectories in parallel. It is possible that new files are
562          * being created within subdirectories, though, so the rmdir call could
563          * fail.  Worst consequence is a less friendly error message.
564          *
565          * If redo is true then ENOENT is a likely outcome here, and we allow it
566          * to pass without comment.  In normal operation we still allow it, but
567          * with a warning.      This is because even though ProcessUtility disallows
568          * DROP TABLESPACE in a transaction block, it's possible that a previous
569          * DROP failed and rolled back after removing the tablespace directories
570          * and symlink.  We want to allow a new DROP attempt to succeed at
571          * removing the catalog entries, so we should not give a hard error here.
572          */
573         dirdesc = AllocateDir(location);
574         if (dirdesc == NULL)
575         {
576                 if (errno == ENOENT)
577                 {
578                         if (!redo)
579                                 ereport(WARNING,
580                                                 (errcode_for_file_access(),
581                                                  errmsg("could not open directory \"%s\": %m",
582                                                                 location)));
583                         pfree(location);
584                         return true;
585                 }
586                 /* else let ReadDir report the error */
587         }
588
589         while ((de = ReadDir(dirdesc, location)) != NULL)
590         {
591                 /* Note we ignore PG_VERSION for the nonce */
592                 if (strcmp(de->d_name, ".") == 0 ||
593                         strcmp(de->d_name, "..") == 0 ||
594                         strcmp(de->d_name, "PG_VERSION") == 0)
595                         continue;
596
597                 subfile = palloc(strlen(location) + 1 + strlen(de->d_name) + 1);
598                 sprintf(subfile, "%s/%s", location, de->d_name);
599
600                 /* This check is just to deliver a friendlier error message */
601                 if (!directory_is_empty(subfile))
602                 {
603                         FreeDir(dirdesc);
604                         return false;
605                 }
606
607                 /* Do the real deed */
608                 if (rmdir(subfile) < 0)
609                         ereport(ERROR,
610                                         (errcode_for_file_access(),
611                                          errmsg("could not remove directory \"%s\": %m",
612                                                         subfile)));
613
614                 pfree(subfile);
615
616                 /*
617                  * OS X 10.6 and 10.6.1 have a bug in readdir() that causes the
618                  * next call to fail after deleting the current element.  Hopefully
619                  * that will be fixed real soon, but for the moment we have this
620                  * ugly kluge to restart the directory scan.
621                  */
622 #ifdef __darwin__
623                 FreeDir(dirdesc);
624                 dirdesc = AllocateDir(location);
625 #endif
626         }
627
628         FreeDir(dirdesc);
629
630         /*
631          * Okay, try to unlink PG_VERSION (we allow it to not be there, even in
632          * non-REDO case, for robustness).
633          */
634         subfile = palloc(strlen(location) + 11 + 1);
635         sprintf(subfile, "%s/PG_VERSION", location);
636
637         if (unlink(subfile) < 0)
638         {
639                 if (errno != ENOENT)
640                         ereport(ERROR,
641                                         (errcode_for_file_access(),
642                                          errmsg("could not remove file \"%s\": %m",
643                                                         subfile)));
644         }
645
646         pfree(subfile);
647
648         /*
649          * Okay, try to remove the symlink.  We must however deal with the
650          * possibility that it's a directory instead of a symlink --- this could
651          * happen during WAL replay (see TablespaceCreateDbspace), and it is also
652          * the normal case on Windows.
653          */
654         if (lstat(location, &st) == 0 && S_ISDIR(st.st_mode))
655         {
656                 if (rmdir(location) < 0)
657                         ereport(ERROR,
658                                         (errcode_for_file_access(),
659                                          errmsg("could not remove directory \"%s\": %m",
660                                                         location)));
661         }
662         else
663         {
664                 if (unlink(location) < 0)
665                         ereport(ERROR,
666                                         (errcode_for_file_access(),
667                                          errmsg("could not remove symbolic link \"%s\": %m",
668                                                         location)));
669         }
670
671         pfree(location);
672
673         return true;
674 }
675
676 /*
677  * write out the PG_VERSION file in the specified directory
678  */
679 static void
680 set_short_version(const char *path)
681 {
682         char       *short_version;
683         bool            gotdot = false;
684         int                     end;
685         char       *fullname;
686         FILE       *version_file;
687
688         /* Construct short version string (should match initdb.c) */
689         short_version = pstrdup(PG_VERSION);
690
691         for (end = 0; short_version[end] != '\0'; end++)
692         {
693                 if (short_version[end] == '.')
694                 {
695                         Assert(end != 0);
696                         if (gotdot)
697                                 break;
698                         else
699                                 gotdot = true;
700                 }
701                 else if (short_version[end] < '0' || short_version[end] > '9')
702                 {
703                         /* gone past digits and dots */
704                         break;
705                 }
706         }
707         Assert(end > 0 && short_version[end - 1] != '.' && gotdot);
708         short_version[end] = '\0';
709
710         /* Now write the file */
711         fullname = palloc(strlen(path) + 11 + 1);
712         sprintf(fullname, "%s/PG_VERSION", path);
713         version_file = AllocateFile(fullname, PG_BINARY_W);
714         if (version_file == NULL)
715                 ereport(ERROR,
716                                 (errcode_for_file_access(),
717                                  errmsg("could not write to file \"%s\": %m",
718                                                 fullname)));
719         fprintf(version_file, "%s\n", short_version);
720         if (FreeFile(version_file))
721                 ereport(ERROR,
722                                 (errcode_for_file_access(),
723                                  errmsg("could not write to file \"%s\": %m",
724                                                 fullname)));
725
726         pfree(fullname);
727         pfree(short_version);
728 }
729
730 /*
731  * Check if a directory is empty.
732  *
733  * This probably belongs somewhere else, but not sure where...
734  */
735 bool
736 directory_is_empty(const char *path)
737 {
738         DIR                *dirdesc;
739         struct dirent *de;
740
741         dirdesc = AllocateDir(path);
742
743         while ((de = ReadDir(dirdesc, path)) != NULL)
744         {
745                 if (strcmp(de->d_name, ".") == 0 ||
746                         strcmp(de->d_name, "..") == 0)
747                         continue;
748                 FreeDir(dirdesc);
749                 return false;
750         }
751
752         FreeDir(dirdesc);
753         return true;
754 }
755
756 /*
757  * Rename a tablespace
758  */
759 void
760 RenameTableSpace(const char *oldname, const char *newname)
761 {
762         Relation        rel;
763         ScanKeyData entry[1];
764         HeapScanDesc scan;
765         HeapTuple       tup;
766         HeapTuple       newtuple;
767         Form_pg_tablespace newform;
768
769         /* Search pg_tablespace */
770         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
771
772         ScanKeyInit(&entry[0],
773                                 Anum_pg_tablespace_spcname,
774                                 BTEqualStrategyNumber, F_NAMEEQ,
775                                 CStringGetDatum(oldname));
776         scan = heap_beginscan(rel, SnapshotNow, 1, entry);
777         tup = heap_getnext(scan, ForwardScanDirection);
778         if (!HeapTupleIsValid(tup))
779                 ereport(ERROR,
780                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
781                                  errmsg("tablespace \"%s\" does not exist",
782                                                 oldname)));
783
784         newtuple = heap_copytuple(tup);
785         newform = (Form_pg_tablespace) GETSTRUCT(newtuple);
786
787         heap_endscan(scan);
788
789         /* Must be owner */
790         if (!pg_tablespace_ownercheck(HeapTupleGetOid(newtuple), GetUserId()))
791                 aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE, oldname);
792
793         /* Validate new name */
794         if (!allowSystemTableMods && IsReservedName(newname))
795                 ereport(ERROR,
796                                 (errcode(ERRCODE_RESERVED_NAME),
797                                  errmsg("unacceptable tablespace name \"%s\"", newname),
798                 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
799
800         /* Make sure the new name doesn't exist */
801         ScanKeyInit(&entry[0],
802                                 Anum_pg_tablespace_spcname,
803                                 BTEqualStrategyNumber, F_NAMEEQ,
804                                 CStringGetDatum(newname));
805         scan = heap_beginscan(rel, SnapshotNow, 1, entry);
806         tup = heap_getnext(scan, ForwardScanDirection);
807         if (HeapTupleIsValid(tup))
808                 ereport(ERROR,
809                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
810                                  errmsg("tablespace \"%s\" already exists",
811                                                 newname)));
812
813         heap_endscan(scan);
814
815         /* OK, update the entry */
816         namestrcpy(&(newform->spcname), newname);
817
818         simple_heap_update(rel, &newtuple->t_self, newtuple);
819         CatalogUpdateIndexes(rel, newtuple);
820
821         heap_close(rel, NoLock);
822 }
823
824 /*
825  * Change tablespace owner
826  */
827 void
828 AlterTableSpaceOwner(const char *name, Oid newOwnerId)
829 {
830         Relation        rel;
831         ScanKeyData entry[1];
832         HeapScanDesc scandesc;
833         Form_pg_tablespace spcForm;
834         HeapTuple       tup;
835
836         /* Search pg_tablespace */
837         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
838
839         ScanKeyInit(&entry[0],
840                                 Anum_pg_tablespace_spcname,
841                                 BTEqualStrategyNumber, F_NAMEEQ,
842                                 CStringGetDatum(name));
843         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
844         tup = heap_getnext(scandesc, ForwardScanDirection);
845         if (!HeapTupleIsValid(tup))
846                 ereport(ERROR,
847                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
848                                  errmsg("tablespace \"%s\" does not exist", name)));
849
850         spcForm = (Form_pg_tablespace) GETSTRUCT(tup);
851
852         /*
853          * If the new owner is the same as the existing owner, consider the
854          * command to have succeeded.  This is for dump restoration purposes.
855          */
856         if (spcForm->spcowner != newOwnerId)
857         {
858                 Datum           repl_val[Natts_pg_tablespace];
859                 bool            repl_null[Natts_pg_tablespace];
860                 bool            repl_repl[Natts_pg_tablespace];
861                 Acl                *newAcl;
862                 Datum           aclDatum;
863                 bool            isNull;
864                 HeapTuple       newtuple;
865
866                 /* Otherwise, must be owner of the existing object */
867                 if (!pg_tablespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
868                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
869                                                    name);
870
871                 /* Must be able to become new owner */
872                 check_is_member_of_role(GetUserId(), newOwnerId);
873
874                 /*
875                  * Normally we would also check for create permissions here, but there
876                  * are none for tablespaces so we follow what rename tablespace does
877                  * and omit the create permissions check.
878                  *
879                  * NOTE: Only superusers may create tablespaces to begin with and so
880                  * initially only a superuser would be able to change its ownership
881                  * anyway.
882                  */
883
884                 memset(repl_null, false, sizeof(repl_null));
885                 memset(repl_repl, false, sizeof(repl_repl));
886
887                 repl_repl[Anum_pg_tablespace_spcowner - 1] = true;
888                 repl_val[Anum_pg_tablespace_spcowner - 1] = ObjectIdGetDatum(newOwnerId);
889
890                 /*
891                  * Determine the modified ACL for the new owner.  This is only
892                  * necessary when the ACL is non-null.
893                  */
894                 aclDatum = heap_getattr(tup,
895                                                                 Anum_pg_tablespace_spcacl,
896                                                                 RelationGetDescr(rel),
897                                                                 &isNull);
898                 if (!isNull)
899                 {
900                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
901                                                                  spcForm->spcowner, newOwnerId);
902                         repl_repl[Anum_pg_tablespace_spcacl - 1] = true;
903                         repl_val[Anum_pg_tablespace_spcacl - 1] = PointerGetDatum(newAcl);
904                 }
905
906                 newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
907
908                 simple_heap_update(rel, &newtuple->t_self, newtuple);
909                 CatalogUpdateIndexes(rel, newtuple);
910
911                 heap_freetuple(newtuple);
912
913                 /* Update owner dependency reference */
914                 changeDependencyOnOwner(TableSpaceRelationId, HeapTupleGetOid(tup),
915                                                                 newOwnerId);
916         }
917
918         heap_endscan(scandesc);
919         heap_close(rel, NoLock);
920 }
921
922
923 /*
924  * Routines for handling the GUC variable 'default_tablespace'.
925  */
926
927 /* assign_hook: validate new default_tablespace, do extra actions as needed */
928 const char *
929 assign_default_tablespace(const char *newval, bool doit, GucSource source)
930 {
931         /*
932          * If we aren't inside a transaction, we cannot do database access so
933          * cannot verify the name.      Must accept the value on faith.
934          */
935         if (IsTransactionState())
936         {
937                 if (newval[0] != '\0' &&
938                         !OidIsValid(get_tablespace_oid(newval)))
939                 {
940                         ereport(GUC_complaint_elevel(source),
941                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
942                                          errmsg("tablespace \"%s\" does not exist",
943                                                         newval)));
944                         return NULL;
945                 }
946         }
947
948         return newval;
949 }
950
951 /*
952  * GetDefaultTablespace -- get the OID of the current default tablespace
953  *
954  * Regular objects and temporary objects have different default tablespaces,
955  * hence the forTemp parameter must be specified.
956  *
957  * May return InvalidOid to indicate "use the database's default tablespace".
958  *
959  * Note that caller is expected to check appropriate permissions for any
960  * result other than InvalidOid.
961  *
962  * This exists to hide (and possibly optimize the use of) the
963  * default_tablespace GUC variable.
964  */
965 Oid
966 GetDefaultTablespace(bool forTemp)
967 {
968         Oid                     result;
969
970         /* The temp-table case is handled elsewhere */
971         if (forTemp)
972         {
973                 PrepareTempTablespaces();
974                 return GetNextTempTableSpace();
975         }
976
977         /* Fast path for default_tablespace == "" */
978         if (default_tablespace == NULL || default_tablespace[0] == '\0')
979                 return InvalidOid;
980
981         /*
982          * It is tempting to cache this lookup for more speed, but then we would
983          * fail to detect the case where the tablespace was dropped since the GUC
984          * variable was set.  Note also that we don't complain if the value fails
985          * to refer to an existing tablespace; we just silently return InvalidOid,
986          * causing the new object to be created in the database's tablespace.
987          */
988         result = get_tablespace_oid(default_tablespace);
989
990         /*
991          * Allow explicit specification of database's default tablespace in
992          * default_tablespace without triggering permissions checks.
993          */
994         if (result == MyDatabaseTableSpace)
995                 result = InvalidOid;
996         return result;
997 }
998
999
1000 /*
1001  * Routines for handling the GUC variable 'temp_tablespaces'.
1002  */
1003
1004 /* assign_hook: validate new temp_tablespaces, do extra actions as needed */
1005 const char *
1006 assign_temp_tablespaces(const char *newval, bool doit, GucSource source)
1007 {
1008         char       *rawname;
1009         List       *namelist;
1010
1011         /* Need a modifiable copy of string */
1012         rawname = pstrdup(newval);
1013
1014         /* Parse string into list of identifiers */
1015         if (!SplitIdentifierString(rawname, ',', &namelist))
1016         {
1017                 /* syntax error in name list */
1018                 pfree(rawname);
1019                 list_free(namelist);
1020                 return NULL;
1021         }
1022
1023         /*
1024          * If we aren't inside a transaction, we cannot do database access so
1025          * cannot verify the individual names.  Must accept the list on faith.
1026          * Fortunately, there's then also no need to pass the data to fd.c.
1027          */
1028         if (IsTransactionState())
1029         {
1030                 /*
1031                  * If we error out below, or if we are called multiple times in one
1032                  * transaction, we'll leak a bit of TopTransactionContext memory.
1033                  * Doesn't seem worth worrying about.
1034                  */
1035                 Oid                *tblSpcs;
1036                 int                     numSpcs;
1037                 ListCell   *l;
1038
1039                 tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1040                                                                                 list_length(namelist) * sizeof(Oid));
1041                 numSpcs = 0;
1042                 foreach(l, namelist)
1043                 {
1044                         char       *curname = (char *) lfirst(l);
1045                         Oid                     curoid;
1046                         AclResult       aclresult;
1047
1048                         /* Allow an empty string (signifying database default) */
1049                         if (curname[0] == '\0')
1050                         {
1051                                 tblSpcs[numSpcs++] = InvalidOid;
1052                                 continue;
1053                         }
1054
1055                         /* Else verify that name is a valid tablespace name */
1056                         curoid = get_tablespace_oid(curname);
1057                         if (curoid == InvalidOid)
1058                         {
1059                                 /*
1060                                  * In an interactive SET command, we ereport for bad info.
1061                                  * Otherwise, silently ignore any bad list elements.
1062                                  */
1063                                 if (source >= PGC_S_INTERACTIVE)
1064                                         ereport(ERROR,
1065                                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
1066                                                          errmsg("tablespace \"%s\" does not exist",
1067                                                                         curname)));
1068                                 continue;
1069                         }
1070
1071                         /*
1072                          * Allow explicit specification of database's default tablespace
1073                          * in temp_tablespaces without triggering permissions checks.
1074                          */
1075                         if (curoid == MyDatabaseTableSpace)
1076                         {
1077                                 tblSpcs[numSpcs++] = InvalidOid;
1078                                 continue;
1079                         }
1080
1081                         /* Check permissions similarly */
1082                         aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1083                                                                                            ACL_CREATE);
1084                         if (aclresult != ACLCHECK_OK)
1085                         {
1086                                 if (source >= PGC_S_INTERACTIVE)
1087                                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE, curname);
1088                                 continue;
1089                         }
1090
1091                         tblSpcs[numSpcs++] = curoid;
1092                 }
1093
1094                 /* If actively "doing it", give the new list to fd.c */
1095                 if (doit)
1096                         SetTempTablespaces(tblSpcs, numSpcs);
1097                 else
1098                         pfree(tblSpcs);
1099         }
1100
1101         pfree(rawname);
1102         list_free(namelist);
1103
1104         return newval;
1105 }
1106
1107 /*
1108  * PrepareTempTablespaces -- prepare to use temp tablespaces
1109  *
1110  * If we have not already done so in the current transaction, parse the
1111  * temp_tablespaces GUC variable and tell fd.c which tablespace(s) to use
1112  * for temp files.
1113  */
1114 void
1115 PrepareTempTablespaces(void)
1116 {
1117         char       *rawname;
1118         List       *namelist;
1119         Oid                *tblSpcs;
1120         int                     numSpcs;
1121         ListCell   *l;
1122
1123         /* No work if already done in current transaction */
1124         if (TempTablespacesAreSet())
1125                 return;
1126
1127         /*
1128          * Can't do catalog access unless within a transaction.  This is just a
1129          * safety check in case this function is called by low-level code that
1130          * could conceivably execute outside a transaction.  Note that in such a
1131          * scenario, fd.c will fall back to using the current database's default
1132          * tablespace, which should always be OK.
1133          */
1134         if (!IsTransactionState())
1135                 return;
1136
1137         /* Need a modifiable copy of string */
1138         rawname = pstrdup(temp_tablespaces);
1139
1140         /* Parse string into list of identifiers */
1141         if (!SplitIdentifierString(rawname, ',', &namelist))
1142         {
1143                 /* syntax error in name list */
1144                 SetTempTablespaces(NULL, 0);
1145                 pfree(rawname);
1146                 list_free(namelist);
1147                 return;
1148         }
1149
1150         /* Store tablespace OIDs in an array in TopTransactionContext */
1151         tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1152                                                                                  list_length(namelist) * sizeof(Oid));
1153         numSpcs = 0;
1154         foreach(l, namelist)
1155         {
1156                 char       *curname = (char *) lfirst(l);
1157                 Oid                     curoid;
1158                 AclResult       aclresult;
1159
1160                 /* Allow an empty string (signifying database default) */
1161                 if (curname[0] == '\0')
1162                 {
1163                         tblSpcs[numSpcs++] = InvalidOid;
1164                         continue;
1165                 }
1166
1167                 /* Else verify that name is a valid tablespace name */
1168                 curoid = get_tablespace_oid(curname);
1169                 if (curoid == InvalidOid)
1170                 {
1171                         /* Silently ignore any bad list elements */
1172                         continue;
1173                 }
1174
1175                 /*
1176                  * Allow explicit specification of database's default tablespace in
1177                  * temp_tablespaces without triggering permissions checks.
1178                  */
1179                 if (curoid == MyDatabaseTableSpace)
1180                 {
1181                         tblSpcs[numSpcs++] = InvalidOid;
1182                         continue;
1183                 }
1184
1185                 /* Check permissions similarly */
1186                 aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1187                                                                                    ACL_CREATE);
1188                 if (aclresult != ACLCHECK_OK)
1189                         continue;
1190
1191                 tblSpcs[numSpcs++] = curoid;
1192         }
1193
1194         SetTempTablespaces(tblSpcs, numSpcs);
1195
1196         pfree(rawname);
1197         list_free(namelist);
1198 }
1199
1200
1201 /*
1202  * get_tablespace_oid - given a tablespace name, look up the OID
1203  *
1204  * Returns InvalidOid if tablespace name not found.
1205  */
1206 Oid
1207 get_tablespace_oid(const char *tablespacename)
1208 {
1209         Oid                     result;
1210         Relation        rel;
1211         HeapScanDesc scandesc;
1212         HeapTuple       tuple;
1213         ScanKeyData entry[1];
1214
1215         /*
1216          * Search pg_tablespace.  We use a heapscan here even though there is an
1217          * index on name, on the theory that pg_tablespace will usually have just
1218          * a few entries and so an indexed lookup is a waste of effort.
1219          */
1220         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1221
1222         ScanKeyInit(&entry[0],
1223                                 Anum_pg_tablespace_spcname,
1224                                 BTEqualStrategyNumber, F_NAMEEQ,
1225                                 CStringGetDatum(tablespacename));
1226         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
1227         tuple = heap_getnext(scandesc, ForwardScanDirection);
1228
1229         /* We assume that there can be at most one matching tuple */
1230         if (HeapTupleIsValid(tuple))
1231                 result = HeapTupleGetOid(tuple);
1232         else
1233                 result = InvalidOid;
1234
1235         heap_endscan(scandesc);
1236         heap_close(rel, AccessShareLock);
1237
1238         return result;
1239 }
1240
1241 /*
1242  * get_tablespace_name - given a tablespace OID, look up the name
1243  *
1244  * Returns a palloc'd string, or NULL if no such tablespace.
1245  */
1246 char *
1247 get_tablespace_name(Oid spc_oid)
1248 {
1249         char       *result;
1250         Relation        rel;
1251         HeapScanDesc scandesc;
1252         HeapTuple       tuple;
1253         ScanKeyData entry[1];
1254
1255         /*
1256          * Search pg_tablespace.  We use a heapscan here even though there is an
1257          * index on oid, on the theory that pg_tablespace will usually have just a
1258          * few entries and so an indexed lookup is a waste of effort.
1259          */
1260         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1261
1262         ScanKeyInit(&entry[0],
1263                                 ObjectIdAttributeNumber,
1264                                 BTEqualStrategyNumber, F_OIDEQ,
1265                                 ObjectIdGetDatum(spc_oid));
1266         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
1267         tuple = heap_getnext(scandesc, ForwardScanDirection);
1268
1269         /* We assume that there can be at most one matching tuple */
1270         if (HeapTupleIsValid(tuple))
1271                 result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname));
1272         else
1273                 result = NULL;
1274
1275         heap_endscan(scandesc);
1276         heap_close(rel, AccessShareLock);
1277
1278         return result;
1279 }
1280
1281
1282 /*
1283  * TABLESPACE resource manager's routines
1284  */
1285 void
1286 tblspc_redo(XLogRecPtr lsn, XLogRecord *record)
1287 {
1288         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1289
1290         /* Backup blocks are not used in tblspc records */
1291         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
1292
1293         if (info == XLOG_TBLSPC_CREATE)
1294         {
1295                 xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
1296                 char       *location = xlrec->ts_path;
1297                 char       *linkloc;
1298
1299                 /*
1300                  * Attempt to coerce target directory to safe permissions.      If this
1301                  * fails, it doesn't exist or has the wrong owner.
1302                  */
1303                 if (chmod(location, 0700) != 0)
1304                         ereport(ERROR,
1305                                         (errcode_for_file_access(),
1306                                   errmsg("could not set permissions on directory \"%s\": %m",
1307                                                  location)));
1308
1309                 /* Create or re-create the PG_VERSION file in the target directory */
1310                 set_short_version(location);
1311
1312                 /* Create the symlink if not already present */
1313                 linkloc = (char *) palloc(10 + 10 + 1);
1314                 sprintf(linkloc, "pg_tblspc/%u", xlrec->ts_id);
1315
1316                 if (symlink(location, linkloc) < 0)
1317                 {
1318                         if (errno != EEXIST)
1319                                 ereport(ERROR,
1320                                                 (errcode_for_file_access(),
1321                                                  errmsg("could not create symbolic link \"%s\": %m",
1322                                                                 linkloc)));
1323                 }
1324
1325                 pfree(linkloc);
1326         }
1327         else if (info == XLOG_TBLSPC_DROP)
1328         {
1329                 xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);
1330
1331                 if (!remove_tablespace_directories(xlrec->ts_id, true))
1332                         ereport(ERROR,
1333                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1334                                          errmsg("tablespace %u is not empty",
1335                                                         xlrec->ts_id)));
1336         }
1337         else
1338                 elog(PANIC, "tblspc_redo: unknown op code %u", info);
1339 }
1340
1341 void
1342 tblspc_desc(StringInfo buf, uint8 xl_info, char *rec)
1343 {
1344         uint8           info = xl_info & ~XLR_INFO_MASK;
1345
1346         if (info == XLOG_TBLSPC_CREATE)
1347         {
1348                 xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) rec;
1349
1350                 appendStringInfo(buf, "create ts: %u \"%s\"",
1351                                                  xlrec->ts_id, xlrec->ts_path);
1352         }
1353         else if (info == XLOG_TBLSPC_DROP)
1354         {
1355                 xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) rec;
1356
1357                 appendStringInfo(buf, "drop ts: %u", xlrec->ts_id);
1358         }
1359         else
1360                 appendStringInfo(buf, "UNKNOWN");
1361 }