]> granicus.if.org Git - postgresql/blob - src/backend/commands/tablespace.c
Clarify tablespace.c::TablespaceCreateDbspace() comments.
[postgresql] / src / backend / commands / tablespace.c
1 /*-------------------------------------------------------------------------
2  *
3  * tablespace.c
4  *        Commands to manipulate table spaces
5  *
6  * Tablespaces in PostgreSQL are designed to allow users to determine
7  * where the data file(s) for a given database object reside on the file
8  * system.
9  *
10  * A tablespace represents a directory on the file system. At tablespace
11  * creation time, the directory must be empty. To simplify things and
12  * remove the possibility of having file name conflicts, we isolate
13  * files within a tablespace into database-specific subdirectories.
14  *
15  * To support file access via the information given in RelFileNode, we
16  * maintain a symbolic-link map in $PGDATA/pg_tblspc. The symlinks are
17  * named by tablespace OIDs and point to the actual tablespace directories.
18  * Thus the full path to an arbitrary file is
19  *                      $PGDATA/pg_tblspc/spcoid/dboid/relfilenode
20  *
21  * There are two tablespaces created at initdb time: pg_global (for shared
22  * tables) and pg_default (for everything else).  For backwards compatibility
23  * and to remain functional on platforms without symlinks, these tablespaces
24  * are accessed specially: they are respectively
25  *                      $PGDATA/global/relfilenode
26  *                      $PGDATA/base/dboid/relfilenode
27  *
28  * To allow CREATE DATABASE to give a new database a default tablespace
29  * that's different from the template database's default, we make the
30  * provision that a zero in pg_class.reltablespace means the database's
31  * default tablespace.  Without this, CREATE DATABASE would have to go in
32  * and munge the system catalogs of the new database.
33  *
34  *
35  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
36  * Portions Copyright (c) 1994, Regents of the University of California
37  *
38  *
39  * IDENTIFICATION
40  *        $PostgreSQL: pgsql/src/backend/commands/tablespace.c,v 1.69 2010/01/07 04:05:39 momjian Exp $
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45
46 #include <unistd.h>
47 #include <dirent.h>
48 #include <sys/types.h>
49 #include <sys/stat.h>
50
51 #include "access/heapam.h"
52 #include "access/reloptions.h"
53 #include "access/sysattr.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "catalog/catalog.h"
57 #include "catalog/dependency.h"
58 #include "catalog/indexing.h"
59 #include "catalog/pg_tablespace.h"
60 #include "commands/comment.h"
61 #include "commands/defrem.h"
62 #include "commands/tablespace.h"
63 #include "miscadmin.h"
64 #include "postmaster/bgwriter.h"
65 #include "storage/fd.h"
66 #include "storage/procarray.h"
67 #include "storage/standby.h"
68 #include "utils/acl.h"
69 #include "utils/builtins.h"
70 #include "utils/fmgroids.h"
71 #include "utils/guc.h"
72 #include "utils/lsyscache.h"
73 #include "utils/memutils.h"
74 #include "utils/rel.h"
75 #include "utils/syscache.h"
76 #include "utils/tqual.h"
77
78
79 /* GUC variables */
80 char       *default_tablespace = NULL;
81 char       *temp_tablespaces = NULL;
82
83
84 static bool remove_tablespace_directories(Oid tablespaceoid, bool redo);
85 static void write_version_file(const char *path);
86
87
88 /*
89  * Each database using a table space is isolated into its own name space
90  * by a subdirectory named for the database OID.  On first creation of an
91  * object in the tablespace, create the subdirectory.  If the subdirectory
92  * already exists, fall through quietly.
93  *
94  * isRedo indicates that we are creating an object during WAL replay.
95  * In this case we will cope with the possibility of the tablespace
96  * directory not being there either --- this could happen if we are
97  * replaying an operation on a table in a subsequently-dropped tablespace.
98  * We handle this by making a directory in the place where the tablespace
99  * symlink would normally be.  This isn't an exact replay of course, but
100  * it's the best we can do given the available information.
101  *
102  * If tablespaces are not supported, you might think this could be a no-op,
103  * but you'd be wrong: we still need it in case we have to re-create a
104  * database subdirectory (of $PGDATA/base) during WAL replay.
105  */
106 void
107 TablespaceCreateDbspace(Oid spcNode, Oid dbNode, bool isRedo)
108 {
109         struct stat st;
110         char       *dir;
111
112         /*
113          * The global tablespace doesn't have per-database subdirectories, so
114          * nothing to do for it.
115          */
116         if (spcNode == GLOBALTABLESPACE_OID)
117                 return;
118
119         Assert(OidIsValid(spcNode));
120         Assert(OidIsValid(dbNode));
121
122         dir = GetDatabasePath(dbNode, spcNode);
123
124         if (stat(dir, &st) < 0)
125         {
126                 if (errno == ENOENT)
127                 {
128                         /*
129                          * Acquire TablespaceCreateLock to ensure that no DROP TABLESPACE
130                          * or TablespaceCreateDbspace is running concurrently.
131                          */
132                         LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
133
134                         /*
135                          * Recheck to see if someone created the directory while we were
136                          * waiting for lock.
137                          */
138                         if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode))
139                         {
140                                 /* Directory was created. */
141                         }
142                         else
143                         {
144                                 /* Directory creation failed? */
145                                 if (mkdir(dir, S_IRWXU) < 0)
146                                 {
147                                         char       *parentdir;
148
149                                         /* Failure other than not exists? */
150                                         if (errno != ENOENT || !isRedo)
151                                                 ereport(ERROR,
152                                                                 (errcode_for_file_access(),
153                                                           errmsg("could not create directory \"%s\": %m",
154                                                                          dir)));
155                                         /* Parent directory must be missing */
156                                         parentdir = pstrdup(dir);
157                                         get_parent_directory(parentdir);
158                                         /* Can't create parent either? */
159                                         if (mkdir(parentdir, S_IRWXU) < 0)
160                                                 ereport(ERROR,
161                                                                 (errcode_for_file_access(),
162                                                           errmsg("could not create directory \"%s\": %m",
163                                                                          parentdir)));
164                                         pfree(parentdir);
165                                         /* Create database directory */
166                                         if (mkdir(dir, S_IRWXU) < 0)
167                                                 ereport(ERROR,
168                                                                 (errcode_for_file_access(),
169                                                           errmsg("could not create directory \"%s\": %m",
170                                                                          dir)));
171                                 }
172                         }
173
174                         LWLockRelease(TablespaceCreateLock);
175                 }
176                 else
177                 {
178                         ereport(ERROR,
179                                         (errcode_for_file_access(),
180                                          errmsg("could not stat directory \"%s\": %m", dir)));
181                 }
182         }
183         else
184         {
185                 /* Is it not a directory? */
186                 if (!S_ISDIR(st.st_mode))
187                         ereport(ERROR,
188                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
189                                          errmsg("\"%s\" exists but is not a directory",
190                                                         dir)));
191         }
192
193         pfree(dir);
194 }
195
196 /*
197  * Create a table space
198  *
199  * Only superusers can create a tablespace. This seems a reasonable restriction
200  * since we're determining the system layout and, anyway, we probably have
201  * root if we're doing this kind of activity
202  */
203 void
204 CreateTableSpace(CreateTableSpaceStmt *stmt)
205 {
206 #ifdef HAVE_SYMLINK
207         Relation        rel;
208         Datum           values[Natts_pg_tablespace];
209         bool            nulls[Natts_pg_tablespace];
210         HeapTuple       tuple;
211         Oid                     tablespaceoid;
212         char       *location;
213         char       *linkloc;
214         Oid                     ownerId;
215
216         /* Must be super user */
217         if (!superuser())
218                 ereport(ERROR,
219                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
220                                  errmsg("permission denied to create tablespace \"%s\"",
221                                                 stmt->tablespacename),
222                                  errhint("Must be superuser to create a tablespace.")));
223
224         /* However, the eventual owner of the tablespace need not be */
225         if (stmt->owner)
226                 ownerId = get_roleid_checked(stmt->owner);
227         else
228                 ownerId = GetUserId();
229
230         /* Unix-ify the offered path, and strip any trailing slashes */
231         location = pstrdup(stmt->location);
232         canonicalize_path(location);
233
234         /* disallow quotes, else CREATE DATABASE would be at risk */
235         if (strchr(location, '\''))
236                 ereport(ERROR,
237                                 (errcode(ERRCODE_INVALID_NAME),
238                                  errmsg("tablespace location cannot contain single quotes")));
239
240         /*
241          * Allowing relative paths seems risky
242          *
243          * this also helps us ensure that location is not empty or whitespace
244          */
245         if (!is_absolute_path(location))
246                 ereport(ERROR,
247                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
248                                  errmsg("tablespace location must be an absolute path")));
249
250         /*
251          * Check that location isn't too long. Remember that we're going to append
252          * '/<dboid>/<relid>.<nnn>'  (XXX but do we ever form the whole path
253          * explicitly?  This may be overly conservative.)
254          */
255         if (strlen(location) >= (MAXPGPATH - 1 - OIDCHARS - 1 - OIDCHARS - 1 - OIDCHARS))
256                 ereport(ERROR,
257                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
258                                  errmsg("tablespace location \"%s\" is too long",
259                                                 location)));
260
261         /*
262          * Disallow creation of tablespaces named "pg_xxx"; we reserve this
263          * namespace for system purposes.
264          */
265         if (!allowSystemTableMods && IsReservedName(stmt->tablespacename))
266                 ereport(ERROR,
267                                 (errcode(ERRCODE_RESERVED_NAME),
268                                  errmsg("unacceptable tablespace name \"%s\"",
269                                                 stmt->tablespacename),
270                 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
271
272         /*
273          * Check that there is no other tablespace by this name.  (The unique
274          * index would catch this anyway, but might as well give a friendlier
275          * message.)
276          */
277         if (OidIsValid(get_tablespace_oid(stmt->tablespacename)))
278                 ereport(ERROR,
279                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
280                                  errmsg("tablespace \"%s\" already exists",
281                                                 stmt->tablespacename)));
282
283         /*
284          * Insert tuple into pg_tablespace.  The purpose of doing this first is to
285          * lock the proposed tablename against other would-be creators. The
286          * insertion will roll back if we find problems below.
287          */
288         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
289
290         MemSet(nulls, false, sizeof(nulls));
291
292         values[Anum_pg_tablespace_spcname - 1] =
293                 DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename));
294         values[Anum_pg_tablespace_spcowner - 1] =
295                 ObjectIdGetDatum(ownerId);
296         values[Anum_pg_tablespace_spclocation - 1] =
297                 CStringGetTextDatum(location);
298         nulls[Anum_pg_tablespace_spcacl - 1] = true;
299         nulls[Anum_pg_tablespace_spcoptions - 1] = true;
300
301         tuple = heap_form_tuple(rel->rd_att, values, nulls);
302
303         tablespaceoid = simple_heap_insert(rel, tuple);
304
305         CatalogUpdateIndexes(rel, tuple);
306
307         heap_freetuple(tuple);
308
309         /* Record dependency on owner */
310         recordDependencyOnOwner(TableSpaceRelationId, tablespaceoid, ownerId);
311
312         /*
313          * Attempt to coerce target directory to safe permissions.      If this fails,
314          * it doesn't exist or has the wrong owner.
315          */
316         if (chmod(location, 0700) != 0)
317                 ereport(ERROR,
318                                 (errcode_for_file_access(),
319                                  errmsg("could not set permissions on directory \"%s\": %m",
320                                                 location)));
321
322         /*
323          * Check the target directory is empty.
324          */
325         if (!directory_is_empty(location))
326                 ereport(ERROR,
327                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
328                                  errmsg("directory \"%s\" is not empty",
329                                                 location)));
330
331         /*
332          * Create the PG_VERSION file in the target directory.  This has several
333          * purposes: to make sure we can write in the directory, to prevent
334          * someone from creating another tablespace pointing at the same directory
335          * (the emptiness check above will fail), and to label tablespace
336          * directories by PG version.
337          */
338         write_version_file(location);
339
340         /*
341          * All seems well, create the symlink
342          */
343         linkloc = (char *) palloc(OIDCHARS + OIDCHARS + 1);
344         sprintf(linkloc, "pg_tblspc/%u", tablespaceoid);
345
346         if (symlink(location, linkloc) < 0)
347                 ereport(ERROR,
348                                 (errcode_for_file_access(),
349                                  errmsg("could not create symbolic link \"%s\": %m",
350                                                 linkloc)));
351
352         /* Record the filesystem change in XLOG */
353         {
354                 xl_tblspc_create_rec xlrec;
355                 XLogRecData rdata[2];
356
357                 xlrec.ts_id = tablespaceoid;
358                 rdata[0].data = (char *) &xlrec;
359                 rdata[0].len = offsetof(xl_tblspc_create_rec, ts_path);
360                 rdata[0].buffer = InvalidBuffer;
361                 rdata[0].next = &(rdata[1]);
362
363                 rdata[1].data = (char *) location;
364                 rdata[1].len = strlen(location) + 1;
365                 rdata[1].buffer = InvalidBuffer;
366                 rdata[1].next = NULL;
367
368                 (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_CREATE, rdata);
369         }
370
371         /*
372          * Force synchronous commit, to minimize the window between creating the
373          * symlink on-disk and marking the transaction committed.  It's not great
374          * that there is any window at all, but definitely we don't want to make
375          * it larger than necessary.
376          */
377         ForceSyncCommit();
378
379         pfree(linkloc);
380         pfree(location);
381
382         /* We keep the lock on pg_tablespace until commit */
383         heap_close(rel, NoLock);
384 #else                                                   /* !HAVE_SYMLINK */
385         ereport(ERROR,
386                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
387                          errmsg("tablespaces are not supported on this platform")));
388 #endif   /* HAVE_SYMLINK */
389 }
390
391 /*
392  * Drop a table space
393  *
394  * Be careful to check that the tablespace is empty.
395  */
396 void
397 DropTableSpace(DropTableSpaceStmt *stmt)
398 {
399 #ifdef HAVE_SYMLINK
400         char       *tablespacename = stmt->tablespacename;
401         HeapScanDesc scandesc;
402         Relation        rel;
403         HeapTuple       tuple;
404         ScanKeyData entry[1];
405         Oid                     tablespaceoid;
406
407         /*
408          * Find the target tuple
409          */
410         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
411
412         ScanKeyInit(&entry[0],
413                                 Anum_pg_tablespace_spcname,
414                                 BTEqualStrategyNumber, F_NAMEEQ,
415                                 CStringGetDatum(tablespacename));
416         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
417         tuple = heap_getnext(scandesc, ForwardScanDirection);
418
419         if (!HeapTupleIsValid(tuple))
420         {
421                 if (!stmt->missing_ok)
422                 {
423                         ereport(ERROR,
424                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
425                                          errmsg("tablespace \"%s\" does not exist",
426                                                         tablespacename)));
427                 }
428                 else
429                 {
430                         ereport(NOTICE,
431                                         (errmsg("tablespace \"%s\" does not exist, skipping",
432                                                         tablespacename)));
433                         /* XXX I assume I need one or both of these next two calls */
434                         heap_endscan(scandesc);
435                         heap_close(rel, NoLock);
436                 }
437                 return;
438         }
439
440         tablespaceoid = HeapTupleGetOid(tuple);
441
442         /* Must be tablespace owner */
443         if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
444                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
445                                            tablespacename);
446
447         /* Disallow drop of the standard tablespaces, even by superuser */
448         if (tablespaceoid == GLOBALTABLESPACE_OID ||
449                 tablespaceoid == DEFAULTTABLESPACE_OID)
450                 aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE,
451                                            tablespacename);
452
453         /*
454          * Remove the pg_tablespace tuple (this will roll back if we fail below)
455          */
456         simple_heap_delete(rel, &tuple->t_self);
457
458         heap_endscan(scandesc);
459
460         /*
461          * Remove any comments on this tablespace.
462          */
463         DeleteSharedComments(tablespaceoid, TableSpaceRelationId);
464
465         /*
466          * Remove dependency on owner.
467          */
468         deleteSharedDependencyRecordsFor(TableSpaceRelationId, tablespaceoid, 0);
469
470         /*
471          * Acquire TablespaceCreateLock to ensure that no TablespaceCreateDbspace
472          * is running concurrently.
473          */
474         LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
475
476         /*
477          * Try to remove the physical infrastructure.
478          */
479         if (!remove_tablespace_directories(tablespaceoid, false))
480         {
481                 /*
482                  * Not all files deleted?  However, there can be lingering empty files
483                  * in the directories, left behind by for example DROP TABLE, that
484                  * have been scheduled for deletion at next checkpoint (see comments
485                  * in mdunlink() for details).  We could just delete them immediately,
486                  * but we can't tell them apart from important data files that we
487                  * mustn't delete.  So instead, we force a checkpoint which will clean
488                  * out any lingering files, and try again.
489                  */
490                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
491                 if (!remove_tablespace_directories(tablespaceoid, false))
492                 {
493                         /* Still not empty, the files must be important then */
494                         ereport(ERROR,
495                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
496                                          errmsg("tablespace \"%s\" is not empty",
497                                                         tablespacename)));
498                 }
499         }
500
501         /* Record the filesystem change in XLOG */
502         {
503                 xl_tblspc_drop_rec xlrec;
504                 XLogRecData rdata[1];
505
506                 xlrec.ts_id = tablespaceoid;
507                 rdata[0].data = (char *) &xlrec;
508                 rdata[0].len = sizeof(xl_tblspc_drop_rec);
509                 rdata[0].buffer = InvalidBuffer;
510                 rdata[0].next = NULL;
511
512                 (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP, rdata);
513         }
514
515         /*
516          * Note: because we checked that the tablespace was empty, there should be
517          * no need to worry about flushing shared buffers or free space map
518          * entries for relations in the tablespace.
519          */
520
521         /*
522          * Force synchronous commit, to minimize the window between removing the
523          * files on-disk and marking the transaction committed.  It's not great
524          * that there is any window at all, but definitely we don't want to make
525          * it larger than necessary.
526          */
527         ForceSyncCommit();
528
529         /*
530          * Allow TablespaceCreateDbspace again.
531          */
532         LWLockRelease(TablespaceCreateLock);
533
534         /* We keep the lock on pg_tablespace until commit */
535         heap_close(rel, NoLock);
536 #else                                                   /* !HAVE_SYMLINK */
537         ereport(ERROR,
538                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
539                          errmsg("tablespaces are not supported on this platform")));
540 #endif   /* HAVE_SYMLINK */
541 }
542
543 /*
544  * remove_tablespace_directories: attempt to remove filesystem infrastructure
545  *
546  * Returns TRUE if successful, FALSE if some subdirectory is not empty
547  *
548  * redo indicates we are redoing a drop from XLOG; okay if nothing there
549  */
550 static bool
551 remove_tablespace_directories(Oid tablespaceoid, bool redo)
552 {
553         char       *location;
554         DIR                *dirdesc;
555         struct dirent *de;
556         char       *subfile;
557         struct stat st;
558
559         location = (char *) palloc(OIDCHARS + OIDCHARS + 1);
560         sprintf(location, "pg_tblspc/%u", tablespaceoid);
561
562         /*
563          * Check if the tablespace still contains any files.  We try to rmdir each
564          * per-database directory we find in it.  rmdir failure implies there are
565          * still files in that subdirectory, so give up.  (We do not have to worry
566          * about undoing any already completed rmdirs, since the next attempt to
567          * use the tablespace from that database will simply recreate the
568          * subdirectory via TablespaceCreateDbspace.)
569          *
570          * Since we hold TablespaceCreateLock, no one else should be creating any
571          * fresh subdirectories in parallel. It is possible that new files are
572          * being created within subdirectories, though, so the rmdir call could
573          * fail.  Worst consequence is a less friendly error message.
574          *
575          * If redo is true then ENOENT is a likely outcome here, and we allow it
576          * to pass without comment.  In normal operation we still allow it, but
577          * with a warning.      This is because even though ProcessUtility disallows
578          * DROP TABLESPACE in a transaction block, it's possible that a previous
579          * DROP failed and rolled back after removing the tablespace directories
580          * and symlink.  We want to allow a new DROP attempt to succeed at
581          * removing the catalog entries, so we should not give a hard error here.
582          */
583         dirdesc = AllocateDir(location);
584         if (dirdesc == NULL)
585         {
586                 if (errno == ENOENT)
587                 {
588                         if (!redo)
589                                 ereport(WARNING,
590                                                 (errcode_for_file_access(),
591                                                  errmsg("could not open directory \"%s\": %m",
592                                                                 location)));
593                         pfree(location);
594                         return true;
595                 }
596                 /* else let ReadDir report the error */
597         }
598
599         while ((de = ReadDir(dirdesc, location)) != NULL)
600         {
601                 /* Note we ignore PG_VERSION for the nonce */
602                 if (strcmp(de->d_name, ".") == 0 ||
603                         strcmp(de->d_name, "..") == 0 ||
604                         strcmp(de->d_name, "PG_VERSION") == 0)
605                         continue;
606
607                 subfile = palloc(strlen(location) + 1 + strlen(de->d_name) + 1);
608                 sprintf(subfile, "%s/%s", location, de->d_name);
609
610                 /* This check is just to deliver a friendlier error message */
611                 if (!directory_is_empty(subfile))
612                 {
613                         FreeDir(dirdesc);
614                         return false;
615                 }
616
617                 /* Do the real deed */
618                 if (rmdir(subfile) < 0)
619                         ereport(ERROR,
620                                         (errcode_for_file_access(),
621                                          errmsg("could not remove directory \"%s\": %m",
622                                                         subfile)));
623
624                 pfree(subfile);
625         }
626
627         FreeDir(dirdesc);
628
629         /*
630          * Okay, try to unlink PG_VERSION (we allow it to not be there, even in
631          * non-REDO case, for robustness).
632          */
633         subfile = palloc(strlen(location) + 11 + 1);
634         sprintf(subfile, "%s/PG_VERSION", location);
635
636         if (unlink(subfile) < 0)
637         {
638                 if (errno != ENOENT)
639                         ereport(ERROR,
640                                         (errcode_for_file_access(),
641                                          errmsg("could not remove file \"%s\": %m",
642                                                         subfile)));
643         }
644
645         pfree(subfile);
646
647         /*
648          * Okay, try to remove the symlink.  We must however deal with the
649          * possibility that it's a directory instead of a symlink --- this could
650          * happen during WAL replay (see TablespaceCreateDbspace), and it is also
651          * the normal case on Windows.
652          */
653         if (lstat(location, &st) == 0 && S_ISDIR(st.st_mode))
654         {
655                 if (rmdir(location) < 0)
656                         ereport(ERROR,
657                                         (errcode_for_file_access(),
658                                          errmsg("could not remove directory \"%s\": %m",
659                                                         location)));
660         }
661         else
662         {
663                 if (unlink(location) < 0)
664                         ereport(ERROR,
665                                         (errcode_for_file_access(),
666                                          errmsg("could not remove symbolic link \"%s\": %m",
667                                                         location)));
668         }
669
670         pfree(location);
671
672         return true;
673 }
674
675 /*
676  * write out the PG_VERSION file in the specified directory
677  */
678 static void
679 write_version_file(const char *path)
680 {
681         char       *fullname;
682         FILE       *version_file;
683
684         /* Now write the file */
685         fullname = palloc(strlen(path) + 11 + 1);
686         sprintf(fullname, "%s/PG_VERSION", path);
687
688         if ((version_file = AllocateFile(fullname, PG_BINARY_W)) == NULL)
689                 ereport(ERROR,
690                                 (errcode_for_file_access(),
691                                  errmsg("could not write to file \"%s\": %m",
692                                                 fullname)));
693         fprintf(version_file, "%s\n", PG_MAJORVERSION);
694         if (FreeFile(version_file))
695                 ereport(ERROR,
696                                 (errcode_for_file_access(),
697                                  errmsg("could not write to file \"%s\": %m",
698                                                 fullname)));
699
700         pfree(fullname);
701 }
702
703 /*
704  * Check if a directory is empty.
705  *
706  * This probably belongs somewhere else, but not sure where...
707  */
708 bool
709 directory_is_empty(const char *path)
710 {
711         DIR                *dirdesc;
712         struct dirent *de;
713
714         dirdesc = AllocateDir(path);
715
716         while ((de = ReadDir(dirdesc, path)) != NULL)
717         {
718                 if (strcmp(de->d_name, ".") == 0 ||
719                         strcmp(de->d_name, "..") == 0)
720                         continue;
721                 FreeDir(dirdesc);
722                 return false;
723         }
724
725         FreeDir(dirdesc);
726         return true;
727 }
728
729 /*
730  * Rename a tablespace
731  */
732 void
733 RenameTableSpace(const char *oldname, const char *newname)
734 {
735         Relation        rel;
736         ScanKeyData entry[1];
737         HeapScanDesc scan;
738         HeapTuple       tup;
739         HeapTuple       newtuple;
740         Form_pg_tablespace newform;
741
742         /* Search pg_tablespace */
743         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
744
745         ScanKeyInit(&entry[0],
746                                 Anum_pg_tablespace_spcname,
747                                 BTEqualStrategyNumber, F_NAMEEQ,
748                                 CStringGetDatum(oldname));
749         scan = heap_beginscan(rel, SnapshotNow, 1, entry);
750         tup = heap_getnext(scan, ForwardScanDirection);
751         if (!HeapTupleIsValid(tup))
752                 ereport(ERROR,
753                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
754                                  errmsg("tablespace \"%s\" does not exist",
755                                                 oldname)));
756
757         newtuple = heap_copytuple(tup);
758         newform = (Form_pg_tablespace) GETSTRUCT(newtuple);
759
760         heap_endscan(scan);
761
762         /* Must be owner */
763         if (!pg_tablespace_ownercheck(HeapTupleGetOid(newtuple), GetUserId()))
764                 aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE, oldname);
765
766         /* Validate new name */
767         if (!allowSystemTableMods && IsReservedName(newname))
768                 ereport(ERROR,
769                                 (errcode(ERRCODE_RESERVED_NAME),
770                                  errmsg("unacceptable tablespace name \"%s\"", newname),
771                 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
772
773         /* Make sure the new name doesn't exist */
774         ScanKeyInit(&entry[0],
775                                 Anum_pg_tablespace_spcname,
776                                 BTEqualStrategyNumber, F_NAMEEQ,
777                                 CStringGetDatum(newname));
778         scan = heap_beginscan(rel, SnapshotNow, 1, entry);
779         tup = heap_getnext(scan, ForwardScanDirection);
780         if (HeapTupleIsValid(tup))
781                 ereport(ERROR,
782                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
783                                  errmsg("tablespace \"%s\" already exists",
784                                                 newname)));
785
786         heap_endscan(scan);
787
788         /* OK, update the entry */
789         namestrcpy(&(newform->spcname), newname);
790
791         simple_heap_update(rel, &newtuple->t_self, newtuple);
792         CatalogUpdateIndexes(rel, newtuple);
793
794         heap_close(rel, NoLock);
795 }
796
797 /*
798  * Change tablespace owner
799  */
800 void
801 AlterTableSpaceOwner(const char *name, Oid newOwnerId)
802 {
803         Relation        rel;
804         ScanKeyData entry[1];
805         HeapScanDesc scandesc;
806         Form_pg_tablespace spcForm;
807         HeapTuple       tup;
808
809         /* Search pg_tablespace */
810         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
811
812         ScanKeyInit(&entry[0],
813                                 Anum_pg_tablespace_spcname,
814                                 BTEqualStrategyNumber, F_NAMEEQ,
815                                 CStringGetDatum(name));
816         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
817         tup = heap_getnext(scandesc, ForwardScanDirection);
818         if (!HeapTupleIsValid(tup))
819                 ereport(ERROR,
820                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
821                                  errmsg("tablespace \"%s\" does not exist", name)));
822
823         spcForm = (Form_pg_tablespace) GETSTRUCT(tup);
824
825         /*
826          * If the new owner is the same as the existing owner, consider the
827          * command to have succeeded.  This is for dump restoration purposes.
828          */
829         if (spcForm->spcowner != newOwnerId)
830         {
831                 Datum           repl_val[Natts_pg_tablespace];
832                 bool            repl_null[Natts_pg_tablespace];
833                 bool            repl_repl[Natts_pg_tablespace];
834                 Acl                *newAcl;
835                 Datum           aclDatum;
836                 bool            isNull;
837                 HeapTuple       newtuple;
838
839                 /* Otherwise, must be owner of the existing object */
840                 if (!pg_tablespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
841                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
842                                                    name);
843
844                 /* Must be able to become new owner */
845                 check_is_member_of_role(GetUserId(), newOwnerId);
846
847                 /*
848                  * Normally we would also check for create permissions here, but there
849                  * are none for tablespaces so we follow what rename tablespace does
850                  * and omit the create permissions check.
851                  *
852                  * NOTE: Only superusers may create tablespaces to begin with and so
853                  * initially only a superuser would be able to change its ownership
854                  * anyway.
855                  */
856
857                 memset(repl_null, false, sizeof(repl_null));
858                 memset(repl_repl, false, sizeof(repl_repl));
859
860                 repl_repl[Anum_pg_tablespace_spcowner - 1] = true;
861                 repl_val[Anum_pg_tablespace_spcowner - 1] = ObjectIdGetDatum(newOwnerId);
862
863                 /*
864                  * Determine the modified ACL for the new owner.  This is only
865                  * necessary when the ACL is non-null.
866                  */
867                 aclDatum = heap_getattr(tup,
868                                                                 Anum_pg_tablespace_spcacl,
869                                                                 RelationGetDescr(rel),
870                                                                 &isNull);
871                 if (!isNull)
872                 {
873                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
874                                                                  spcForm->spcowner, newOwnerId);
875                         repl_repl[Anum_pg_tablespace_spcacl - 1] = true;
876                         repl_val[Anum_pg_tablespace_spcacl - 1] = PointerGetDatum(newAcl);
877                 }
878
879                 newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
880
881                 simple_heap_update(rel, &newtuple->t_self, newtuple);
882                 CatalogUpdateIndexes(rel, newtuple);
883
884                 heap_freetuple(newtuple);
885
886                 /* Update owner dependency reference */
887                 changeDependencyOnOwner(TableSpaceRelationId, HeapTupleGetOid(tup),
888                                                                 newOwnerId);
889         }
890
891         heap_endscan(scandesc);
892         heap_close(rel, NoLock);
893 }
894
895
896 /*
897  * Alter table space options
898  */
899 void
900 AlterTableSpaceOptions(AlterTableSpaceOptionsStmt *stmt)
901 {
902         Relation        rel;
903         ScanKeyData entry[1];
904         HeapScanDesc scandesc;
905         HeapTuple       tup;
906         Datum           datum;
907         Datum           newOptions;
908         Datum           repl_val[Natts_pg_tablespace];
909         bool            isnull;
910         bool            repl_null[Natts_pg_tablespace];
911         bool            repl_repl[Natts_pg_tablespace];
912         HeapTuple       newtuple;
913
914         /* Search pg_tablespace */
915         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
916
917         ScanKeyInit(&entry[0],
918                                 Anum_pg_tablespace_spcname,
919                                 BTEqualStrategyNumber, F_NAMEEQ,
920                                 CStringGetDatum(stmt->tablespacename));
921         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
922         tup = heap_getnext(scandesc, ForwardScanDirection);
923         if (!HeapTupleIsValid(tup))
924                 ereport(ERROR,
925                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
926                                  errmsg("tablespace \"%s\" does not exist",
927                                         stmt->tablespacename)));
928
929         /* Must be owner of the existing object */
930         if (!pg_tablespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
931                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
932                                            stmt->tablespacename);
933
934         /* Generate new proposed spcoptions (text array) */
935         datum = heap_getattr(tup, Anum_pg_tablespace_spcoptions,
936                                                  RelationGetDescr(rel), &isnull);
937         newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
938                                                                          stmt->options, NULL, NULL, false,
939                                                                          stmt->isReset);
940         (void) tablespace_reloptions(newOptions, true);
941
942         /* Build new tuple. */
943         memset(repl_null, false, sizeof(repl_null));
944         memset(repl_repl, false, sizeof(repl_repl));
945         if (newOptions != (Datum) 0)
946                 repl_val[Anum_pg_tablespace_spcoptions - 1] = newOptions;
947         else
948                 repl_null[Anum_pg_tablespace_spcoptions - 1] = true;
949         repl_repl[Anum_pg_tablespace_spcoptions - 1] = true;
950         newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val,
951                                                                  repl_null, repl_repl);
952
953         /* Update system catalog. */
954         simple_heap_update(rel, &newtuple->t_self, newtuple);
955         CatalogUpdateIndexes(rel, newtuple);
956         heap_freetuple(newtuple);
957
958         /* Conclude heap scan. */
959         heap_endscan(scandesc);
960         heap_close(rel, NoLock);
961 }
962
963 /*
964  * Routines for handling the GUC variable 'default_tablespace'.
965  */
966
967 /* assign_hook: validate new default_tablespace, do extra actions as needed */
968 const char *
969 assign_default_tablespace(const char *newval, bool doit, GucSource source)
970 {
971         /*
972          * If we aren't inside a transaction, we cannot do database access so
973          * cannot verify the name.      Must accept the value on faith.
974          */
975         if (IsTransactionState())
976         {
977                 if (newval[0] != '\0' &&
978                         !OidIsValid(get_tablespace_oid(newval)))
979                 {
980                         ereport(GUC_complaint_elevel(source),
981                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
982                                          errmsg("tablespace \"%s\" does not exist",
983                                                         newval)));
984                         return NULL;
985                 }
986         }
987
988         return newval;
989 }
990
991 /*
992  * GetDefaultTablespace -- get the OID of the current default tablespace
993  *
994  * Regular objects and temporary objects have different default tablespaces,
995  * hence the forTemp parameter must be specified.
996  *
997  * May return InvalidOid to indicate "use the database's default tablespace".
998  *
999  * Note that caller is expected to check appropriate permissions for any
1000  * result other than InvalidOid.
1001  *
1002  * This exists to hide (and possibly optimize the use of) the
1003  * default_tablespace GUC variable.
1004  */
1005 Oid
1006 GetDefaultTablespace(bool forTemp)
1007 {
1008         Oid                     result;
1009
1010         /* The temp-table case is handled elsewhere */
1011         if (forTemp)
1012         {
1013                 PrepareTempTablespaces();
1014                 return GetNextTempTableSpace();
1015         }
1016
1017         /* Fast path for default_tablespace == "" */
1018         if (default_tablespace == NULL || default_tablespace[0] == '\0')
1019                 return InvalidOid;
1020
1021         /*
1022          * It is tempting to cache this lookup for more speed, but then we would
1023          * fail to detect the case where the tablespace was dropped since the GUC
1024          * variable was set.  Note also that we don't complain if the value fails
1025          * to refer to an existing tablespace; we just silently return InvalidOid,
1026          * causing the new object to be created in the database's tablespace.
1027          */
1028         result = get_tablespace_oid(default_tablespace);
1029
1030         /*
1031          * Allow explicit specification of database's default tablespace in
1032          * default_tablespace without triggering permissions checks.
1033          */
1034         if (result == MyDatabaseTableSpace)
1035                 result = InvalidOid;
1036         return result;
1037 }
1038
1039
1040 /*
1041  * Routines for handling the GUC variable 'temp_tablespaces'.
1042  */
1043
1044 /* assign_hook: validate new temp_tablespaces, do extra actions as needed */
1045 const char *
1046 assign_temp_tablespaces(const char *newval, bool doit, GucSource source)
1047 {
1048         char       *rawname;
1049         List       *namelist;
1050
1051         /* Need a modifiable copy of string */
1052         rawname = pstrdup(newval);
1053
1054         /* Parse string into list of identifiers */
1055         if (!SplitIdentifierString(rawname, ',', &namelist))
1056         {
1057                 /* syntax error in name list */
1058                 pfree(rawname);
1059                 list_free(namelist);
1060                 return NULL;
1061         }
1062
1063         /*
1064          * If we aren't inside a transaction, we cannot do database access so
1065          * cannot verify the individual names.  Must accept the list on faith.
1066          * Fortunately, there's then also no need to pass the data to fd.c.
1067          */
1068         if (IsTransactionState())
1069         {
1070                 /*
1071                  * If we error out below, or if we are called multiple times in one
1072                  * transaction, we'll leak a bit of TopTransactionContext memory.
1073                  * Doesn't seem worth worrying about.
1074                  */
1075                 Oid                *tblSpcs;
1076                 int                     numSpcs;
1077                 ListCell   *l;
1078
1079                 tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1080                                                                                 list_length(namelist) * sizeof(Oid));
1081                 numSpcs = 0;
1082                 foreach(l, namelist)
1083                 {
1084                         char       *curname = (char *) lfirst(l);
1085                         Oid                     curoid;
1086                         AclResult       aclresult;
1087
1088                         /* Allow an empty string (signifying database default) */
1089                         if (curname[0] == '\0')
1090                         {
1091                                 tblSpcs[numSpcs++] = InvalidOid;
1092                                 continue;
1093                         }
1094
1095                         /* Else verify that name is a valid tablespace name */
1096                         curoid = get_tablespace_oid(curname);
1097                         if (curoid == InvalidOid)
1098                         {
1099                                 /*
1100                                  * In an interactive SET command, we ereport for bad info.
1101                                  * Otherwise, silently ignore any bad list elements.
1102                                  */
1103                                 if (source >= PGC_S_INTERACTIVE)
1104                                         ereport(ERROR,
1105                                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
1106                                                          errmsg("tablespace \"%s\" does not exist",
1107                                                                         curname)));
1108                                 continue;
1109                         }
1110
1111                         /*
1112                          * Allow explicit specification of database's default tablespace
1113                          * in temp_tablespaces without triggering permissions checks.
1114                          */
1115                         if (curoid == MyDatabaseTableSpace)
1116                         {
1117                                 tblSpcs[numSpcs++] = InvalidOid;
1118                                 continue;
1119                         }
1120
1121                         /* Check permissions similarly */
1122                         aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1123                                                                                            ACL_CREATE);
1124                         if (aclresult != ACLCHECK_OK)
1125                         {
1126                                 if (source >= PGC_S_INTERACTIVE)
1127                                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE, curname);
1128                                 continue;
1129                         }
1130
1131                         tblSpcs[numSpcs++] = curoid;
1132                 }
1133
1134                 /* If actively "doing it", give the new list to fd.c */
1135                 if (doit)
1136                         SetTempTablespaces(tblSpcs, numSpcs);
1137                 else
1138                         pfree(tblSpcs);
1139         }
1140
1141         pfree(rawname);
1142         list_free(namelist);
1143
1144         return newval;
1145 }
1146
1147 /*
1148  * PrepareTempTablespaces -- prepare to use temp tablespaces
1149  *
1150  * If we have not already done so in the current transaction, parse the
1151  * temp_tablespaces GUC variable and tell fd.c which tablespace(s) to use
1152  * for temp files.
1153  */
1154 void
1155 PrepareTempTablespaces(void)
1156 {
1157         char       *rawname;
1158         List       *namelist;
1159         Oid                *tblSpcs;
1160         int                     numSpcs;
1161         ListCell   *l;
1162
1163         /* No work if already done in current transaction */
1164         if (TempTablespacesAreSet())
1165                 return;
1166
1167         /*
1168          * Can't do catalog access unless within a transaction.  This is just a
1169          * safety check in case this function is called by low-level code that
1170          * could conceivably execute outside a transaction.  Note that in such a
1171          * scenario, fd.c will fall back to using the current database's default
1172          * tablespace, which should always be OK.
1173          */
1174         if (!IsTransactionState())
1175                 return;
1176
1177         /* Need a modifiable copy of string */
1178         rawname = pstrdup(temp_tablespaces);
1179
1180         /* Parse string into list of identifiers */
1181         if (!SplitIdentifierString(rawname, ',', &namelist))
1182         {
1183                 /* syntax error in name list */
1184                 SetTempTablespaces(NULL, 0);
1185                 pfree(rawname);
1186                 list_free(namelist);
1187                 return;
1188         }
1189
1190         /* Store tablespace OIDs in an array in TopTransactionContext */
1191         tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1192                                                                                  list_length(namelist) * sizeof(Oid));
1193         numSpcs = 0;
1194         foreach(l, namelist)
1195         {
1196                 char       *curname = (char *) lfirst(l);
1197                 Oid                     curoid;
1198                 AclResult       aclresult;
1199
1200                 /* Allow an empty string (signifying database default) */
1201                 if (curname[0] == '\0')
1202                 {
1203                         tblSpcs[numSpcs++] = InvalidOid;
1204                         continue;
1205                 }
1206
1207                 /* Else verify that name is a valid tablespace name */
1208                 curoid = get_tablespace_oid(curname);
1209                 if (curoid == InvalidOid)
1210                 {
1211                         /* Silently ignore any bad list elements */
1212                         continue;
1213                 }
1214
1215                 /*
1216                  * Allow explicit specification of database's default tablespace in
1217                  * temp_tablespaces without triggering permissions checks.
1218                  */
1219                 if (curoid == MyDatabaseTableSpace)
1220                 {
1221                         tblSpcs[numSpcs++] = InvalidOid;
1222                         continue;
1223                 }
1224
1225                 /* Check permissions similarly */
1226                 aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1227                                                                                    ACL_CREATE);
1228                 if (aclresult != ACLCHECK_OK)
1229                         continue;
1230
1231                 tblSpcs[numSpcs++] = curoid;
1232         }
1233
1234         SetTempTablespaces(tblSpcs, numSpcs);
1235
1236         pfree(rawname);
1237         list_free(namelist);
1238 }
1239
1240
1241 /*
1242  * get_tablespace_oid - given a tablespace name, look up the OID
1243  *
1244  * Returns InvalidOid if tablespace name not found.
1245  */
1246 Oid
1247 get_tablespace_oid(const char *tablespacename)
1248 {
1249         Oid                     result;
1250         Relation        rel;
1251         HeapScanDesc scandesc;
1252         HeapTuple       tuple;
1253         ScanKeyData entry[1];
1254
1255         /*
1256          * Search pg_tablespace.  We use a heapscan here even though there is an
1257          * index on name, on the theory that pg_tablespace will usually have just
1258          * a few entries and so an indexed lookup is a waste of effort.
1259          */
1260         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1261
1262         ScanKeyInit(&entry[0],
1263                                 Anum_pg_tablespace_spcname,
1264                                 BTEqualStrategyNumber, F_NAMEEQ,
1265                                 CStringGetDatum(tablespacename));
1266         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
1267         tuple = heap_getnext(scandesc, ForwardScanDirection);
1268
1269         /* We assume that there can be at most one matching tuple */
1270         if (HeapTupleIsValid(tuple))
1271                 result = HeapTupleGetOid(tuple);
1272         else
1273                 result = InvalidOid;
1274
1275         heap_endscan(scandesc);
1276         heap_close(rel, AccessShareLock);
1277
1278         return result;
1279 }
1280
1281 /*
1282  * get_tablespace_name - given a tablespace OID, look up the name
1283  *
1284  * Returns a palloc'd string, or NULL if no such tablespace.
1285  */
1286 char *
1287 get_tablespace_name(Oid spc_oid)
1288 {
1289         char       *result;
1290         Relation        rel;
1291         HeapScanDesc scandesc;
1292         HeapTuple       tuple;
1293         ScanKeyData entry[1];
1294
1295         /*
1296          * Search pg_tablespace.  We use a heapscan here even though there is an
1297          * index on oid, on the theory that pg_tablespace will usually have just a
1298          * few entries and so an indexed lookup is a waste of effort.
1299          */
1300         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1301
1302         ScanKeyInit(&entry[0],
1303                                 ObjectIdAttributeNumber,
1304                                 BTEqualStrategyNumber, F_OIDEQ,
1305                                 ObjectIdGetDatum(spc_oid));
1306         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
1307         tuple = heap_getnext(scandesc, ForwardScanDirection);
1308
1309         /* We assume that there can be at most one matching tuple */
1310         if (HeapTupleIsValid(tuple))
1311                 result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname));
1312         else
1313                 result = NULL;
1314
1315         heap_endscan(scandesc);
1316         heap_close(rel, AccessShareLock);
1317
1318         return result;
1319 }
1320
1321
1322 /*
1323  * TABLESPACE resource manager's routines
1324  */
1325 void
1326 tblspc_redo(XLogRecPtr lsn, XLogRecord *record)
1327 {
1328         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1329
1330         /* Backup blocks are not used in tblspc records */
1331         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
1332
1333         if (info == XLOG_TBLSPC_CREATE)
1334         {
1335                 xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
1336                 char       *location = xlrec->ts_path;
1337                 char       *linkloc;
1338
1339                 /*
1340                  * Attempt to coerce target directory to safe permissions.      If this
1341                  * fails, it doesn't exist or has the wrong owner.
1342                  */
1343                 if (chmod(location, 0700) != 0)
1344                         ereport(ERROR,
1345                                         (errcode_for_file_access(),
1346                                   errmsg("could not set permissions on directory \"%s\": %m",
1347                                                  location)));
1348
1349                 /* Create or re-create the PG_VERSION file in the target directory */
1350                 write_version_file(location);
1351
1352                 /* Create the symlink if not already present */
1353                 linkloc = (char *) palloc(OIDCHARS + OIDCHARS + 1);
1354                 sprintf(linkloc, "pg_tblspc/%u", xlrec->ts_id);
1355
1356                 if (symlink(location, linkloc) < 0)
1357                 {
1358                         if (errno != EEXIST)
1359                                 ereport(ERROR,
1360                                                 (errcode_for_file_access(),
1361                                                  errmsg("could not create symbolic link \"%s\": %m",
1362                                                                 linkloc)));
1363                 }
1364
1365                 pfree(linkloc);
1366         }
1367         else if (info == XLOG_TBLSPC_DROP)
1368         {
1369                 xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);
1370
1371                 /*
1372                  * If we issued a WAL record for a drop tablespace it is
1373                  * because there were no files in it at all. That means that
1374                  * no permanent objects can exist in it at this point.
1375                  *
1376                  * It is possible for standby users to be using this tablespace
1377                  * as a location for their temporary files, so if we fail to
1378                  * remove all files then do conflict processing and try again,
1379                  * if currently enabled.
1380                  */
1381                 if (!remove_tablespace_directories(xlrec->ts_id, true))
1382                 {
1383                         VirtualTransactionId *temp_file_users;
1384
1385                         /*
1386                          * Standby users may be currently using this tablespace for
1387                          * for their temporary files. We only care about current
1388                          * users because temp_tablespace parameter will just ignore
1389                          * tablespaces that no longer exist.
1390                          *
1391                          * Ask everybody to cancel their queries immediately so
1392                          * we can ensure no temp files remain and we can remove the
1393                          * tablespace. Nuke the entire site from orbit, it's the only
1394                          * way to be sure.
1395                          *
1396                          * XXX: We could work out the pids of active backends
1397                          * using this tablespace by examining the temp filenames in the
1398                          * directory. We would then convert the pids into VirtualXIDs
1399                          * before attempting to cancel them.
1400                          *
1401                          * We don't wait for commit because drop tablespace is
1402                          * non-transactional.
1403                          */
1404                         temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
1405                                                                                                                 InvalidOid,
1406                                                                                                                 false);
1407                         ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
1408                                                                                                    "drop tablespace",
1409                                                                                                    CONFLICT_MODE_ERROR);
1410
1411                         /*
1412                          * If we did recovery processing then hopefully the
1413                          * backends who wrote temp files should have cleaned up and
1414                          * exited by now. So lets recheck before we throw an error.
1415                          * If !process_conflicts then this will just fail again.
1416                          */
1417                         if (!remove_tablespace_directories(xlrec->ts_id, true))
1418                                 ereport(ERROR,
1419                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1420                                          errmsg("tablespace %u is not empty",
1421                                                         xlrec->ts_id)));
1422                 }
1423         }
1424         else
1425                 elog(PANIC, "tblspc_redo: unknown op code %u", info);
1426 }
1427
1428 void
1429 tblspc_desc(StringInfo buf, uint8 xl_info, char *rec)
1430 {
1431         uint8           info = xl_info & ~XLR_INFO_MASK;
1432
1433         if (info == XLOG_TBLSPC_CREATE)
1434         {
1435                 xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) rec;
1436
1437                 appendStringInfo(buf, "create ts: %u \"%s\"",
1438                                                  xlrec->ts_id, xlrec->ts_path);
1439         }
1440         else if (info == XLOG_TBLSPC_DROP)
1441         {
1442                 xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) rec;
1443
1444                 appendStringInfo(buf, "drop ts: %u", xlrec->ts_id);
1445         }
1446         else
1447                 appendStringInfo(buf, "UNKNOWN");
1448 }