OSDN Git Service

Update copyright for 2009.
[pg-rex/syncrep.git] / src / backend / commands / tablespace.c
1 /*-------------------------------------------------------------------------
2  *
3  * tablespace.c
4  *        Commands to manipulate table spaces
5  *
6  * Tablespaces in PostgreSQL are designed to allow users to determine
7  * where the data file(s) for a given database object reside on the file
8  * system.
9  *
10  * A tablespace represents a directory on the file system. At tablespace
11  * creation time, the directory must be empty. To simplify things and
12  * remove the possibility of having file name conflicts, we isolate
13  * files within a tablespace into database-specific subdirectories.
14  *
15  * To support file access via the information given in RelFileNode, we
16  * maintain a symbolic-link map in $PGDATA/pg_tblspc. The symlinks are
17  * named by tablespace OIDs and point to the actual tablespace directories.
18  * Thus the full path to an arbitrary file is
19  *                      $PGDATA/pg_tblspc/spcoid/dboid/relfilenode
20  *
21  * There are two tablespaces created at initdb time: pg_global (for shared
22  * tables) and pg_default (for everything else).  For backwards compatibility
23  * and to remain functional on platforms without symlinks, these tablespaces
24  * are accessed specially: they are respectively
25  *                      $PGDATA/global/relfilenode
26  *                      $PGDATA/base/dboid/relfilenode
27  *
28  * To allow CREATE DATABASE to give a new database a default tablespace
29  * that's different from the template database's default, we make the
30  * provision that a zero in pg_class.reltablespace means the database's
31  * default tablespace.  Without this, CREATE DATABASE would have to go in
32  * and munge the system catalogs of the new database.
33  *
34  *
35  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
36  * Portions Copyright (c) 1994, Regents of the University of California
37  *
38  *
39  * IDENTIFICATION
40  *        $PostgreSQL: pgsql/src/backend/commands/tablespace.c,v 1.59 2009/01/01 17:23:40 momjian Exp $
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45
46 #include <unistd.h>
47 #include <dirent.h>
48 #include <sys/types.h>
49 #include <sys/stat.h>
50
51 #include "access/heapam.h"
52 #include "access/sysattr.h"
53 #include "access/xact.h"
54 #include "catalog/catalog.h"
55 #include "catalog/dependency.h"
56 #include "catalog/indexing.h"
57 #include "catalog/pg_tablespace.h"
58 #include "commands/comment.h"
59 #include "commands/tablespace.h"
60 #include "miscadmin.h"
61 #include "postmaster/bgwriter.h"
62 #include "storage/fd.h"
63 #include "utils/acl.h"
64 #include "utils/builtins.h"
65 #include "utils/fmgroids.h"
66 #include "utils/guc.h"
67 #include "utils/lsyscache.h"
68 #include "utils/memutils.h"
69 #include "utils/rel.h"
70 #include "utils/tqual.h"
71
72
73 /* GUC variables */
74 char       *default_tablespace = NULL;
75 char       *temp_tablespaces = NULL;
76
77
78 static bool remove_tablespace_directories(Oid tablespaceoid, bool redo);
79 static void set_short_version(const char *path);
80
81
82 /*
83  * Each database using a table space is isolated into its own name space
84  * by a subdirectory named for the database OID.  On first creation of an
85  * object in the tablespace, create the subdirectory.  If the subdirectory
86  * already exists, just fall through quietly.
87  *
88  * isRedo indicates that we are creating an object during WAL replay.
89  * In this case we will cope with the possibility of the tablespace
90  * directory not being there either --- this could happen if we are
91  * replaying an operation on a table in a subsequently-dropped tablespace.
92  * We handle this by making a directory in the place where the tablespace
93  * symlink would normally be.  This isn't an exact replay of course, but
94  * it's the best we can do given the available information.
95  *
96  * If tablespaces are not supported, you might think this could be a no-op,
97  * but you'd be wrong: we still need it in case we have to re-create a
98  * database subdirectory (of $PGDATA/base) during WAL replay.
99  */
100 void
101 TablespaceCreateDbspace(Oid spcNode, Oid dbNode, bool isRedo)
102 {
103         struct stat st;
104         char       *dir;
105
106         /*
107          * The global tablespace doesn't have per-database subdirectories, so
108          * nothing to do for it.
109          */
110         if (spcNode == GLOBALTABLESPACE_OID)
111                 return;
112
113         Assert(OidIsValid(spcNode));
114         Assert(OidIsValid(dbNode));
115
116         dir = GetDatabasePath(dbNode, spcNode);
117
118         if (stat(dir, &st) < 0)
119         {
120                 if (errno == ENOENT)
121                 {
122                         /*
123                          * Acquire TablespaceCreateLock to ensure that no DROP TABLESPACE
124                          * or TablespaceCreateDbspace is running concurrently.
125                          */
126                         LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
127
128                         /*
129                          * Recheck to see if someone created the directory while we were
130                          * waiting for lock.
131                          */
132                         if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode))
133                         {
134                                 /* need not do anything */
135                         }
136                         else
137                         {
138                                 /* OK, go for it */
139                                 if (mkdir(dir, S_IRWXU) < 0)
140                                 {
141                                         char       *parentdir;
142
143                                         if (errno != ENOENT || !isRedo)
144                                                 ereport(ERROR,
145                                                                 (errcode_for_file_access(),
146                                                           errmsg("could not create directory \"%s\": %m",
147                                                                          dir)));
148                                         /* Try to make parent directory too */
149                                         parentdir = pstrdup(dir);
150                                         get_parent_directory(parentdir);
151                                         if (mkdir(parentdir, S_IRWXU) < 0)
152                                                 ereport(ERROR,
153                                                                 (errcode_for_file_access(),
154                                                           errmsg("could not create directory \"%s\": %m",
155                                                                          parentdir)));
156                                         pfree(parentdir);
157                                         if (mkdir(dir, S_IRWXU) < 0)
158                                                 ereport(ERROR,
159                                                                 (errcode_for_file_access(),
160                                                           errmsg("could not create directory \"%s\": %m",
161                                                                          dir)));
162                                 }
163                         }
164
165                         LWLockRelease(TablespaceCreateLock);
166                 }
167                 else
168                 {
169                         ereport(ERROR,
170                                         (errcode_for_file_access(),
171                                          errmsg("could not stat directory \"%s\": %m", dir)));
172                 }
173         }
174         else
175         {
176                 /* be paranoid */
177                 if (!S_ISDIR(st.st_mode))
178                         ereport(ERROR,
179                                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
180                                          errmsg("\"%s\" exists but is not a directory",
181                                                         dir)));
182         }
183
184         pfree(dir);
185 }
186
187 /*
188  * Create a table space
189  *
190  * Only superusers can create a tablespace. This seems a reasonable restriction
191  * since we're determining the system layout and, anyway, we probably have
192  * root if we're doing this kind of activity
193  */
194 void
195 CreateTableSpace(CreateTableSpaceStmt *stmt)
196 {
197 #ifdef HAVE_SYMLINK
198         Relation        rel;
199         Datum           values[Natts_pg_tablespace];
200         bool            nulls[Natts_pg_tablespace];
201         HeapTuple       tuple;
202         Oid                     tablespaceoid;
203         char       *location;
204         char       *linkloc;
205         Oid                     ownerId;
206
207         /* Must be super user */
208         if (!superuser())
209                 ereport(ERROR,
210                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
211                                  errmsg("permission denied to create tablespace \"%s\"",
212                                                 stmt->tablespacename),
213                                  errhint("Must be superuser to create a tablespace.")));
214
215         /* However, the eventual owner of the tablespace need not be */
216         if (stmt->owner)
217                 ownerId = get_roleid_checked(stmt->owner);
218         else
219                 ownerId = GetUserId();
220
221         /* Unix-ify the offered path, and strip any trailing slashes */
222         location = pstrdup(stmt->location);
223         canonicalize_path(location);
224
225         /* disallow quotes, else CREATE DATABASE would be at risk */
226         if (strchr(location, '\''))
227                 ereport(ERROR,
228                                 (errcode(ERRCODE_INVALID_NAME),
229                                  errmsg("tablespace location cannot contain single quotes")));
230
231         /*
232          * Allowing relative paths seems risky
233          *
234          * this also helps us ensure that location is not empty or whitespace
235          */
236         if (!is_absolute_path(location))
237                 ereport(ERROR,
238                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
239                                  errmsg("tablespace location must be an absolute path")));
240
241         /*
242          * Check that location isn't too long. Remember that we're going to append
243          * '/<dboid>/<relid>.<nnn>'  (XXX but do we ever form the whole path
244          * explicitly?  This may be overly conservative.)
245          */
246         if (strlen(location) >= (MAXPGPATH - 1 - 10 - 1 - 10 - 1 - 10))
247                 ereport(ERROR,
248                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
249                                  errmsg("tablespace location \"%s\" is too long",
250                                                 location)));
251
252         /*
253          * Disallow creation of tablespaces named "pg_xxx"; we reserve this
254          * namespace for system purposes.
255          */
256         if (!allowSystemTableMods && IsReservedName(stmt->tablespacename))
257                 ereport(ERROR,
258                                 (errcode(ERRCODE_RESERVED_NAME),
259                                  errmsg("unacceptable tablespace name \"%s\"",
260                                                 stmt->tablespacename),
261                 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
262
263         /*
264          * Check that there is no other tablespace by this name.  (The unique
265          * index would catch this anyway, but might as well give a friendlier
266          * message.)
267          */
268         if (OidIsValid(get_tablespace_oid(stmt->tablespacename)))
269                 ereport(ERROR,
270                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
271                                  errmsg("tablespace \"%s\" already exists",
272                                                 stmt->tablespacename)));
273
274         /*
275          * Insert tuple into pg_tablespace.  The purpose of doing this first is to
276          * lock the proposed tablename against other would-be creators. The
277          * insertion will roll back if we find problems below.
278          */
279         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
280
281         MemSet(nulls, false, sizeof(nulls));
282
283         values[Anum_pg_tablespace_spcname - 1] =
284                 DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename));
285         values[Anum_pg_tablespace_spcowner - 1] =
286                 ObjectIdGetDatum(ownerId);
287         values[Anum_pg_tablespace_spclocation - 1] =
288                 CStringGetTextDatum(location);
289         nulls[Anum_pg_tablespace_spcacl - 1] = true;
290
291         tuple = heap_form_tuple(rel->rd_att, values, nulls);
292
293         tablespaceoid = simple_heap_insert(rel, tuple);
294
295         CatalogUpdateIndexes(rel, tuple);
296
297         heap_freetuple(tuple);
298
299         /* Record dependency on owner */
300         recordDependencyOnOwner(TableSpaceRelationId, tablespaceoid, ownerId);
301
302         /*
303          * Attempt to coerce target directory to safe permissions.      If this fails,
304          * it doesn't exist or has the wrong owner.
305          */
306         if (chmod(location, 0700) != 0)
307                 ereport(ERROR,
308                                 (errcode_for_file_access(),
309                                  errmsg("could not set permissions on directory \"%s\": %m",
310                                                 location)));
311
312         /*
313          * Check the target directory is empty.
314          */
315         if (!directory_is_empty(location))
316                 ereport(ERROR,
317                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
318                                  errmsg("directory \"%s\" is not empty",
319                                                 location)));
320
321         /*
322          * Create the PG_VERSION file in the target directory.  This has several
323          * purposes: to make sure we can write in the directory, to prevent
324          * someone from creating another tablespace pointing at the same directory
325          * (the emptiness check above will fail), and to label tablespace
326          * directories by PG version.
327          */
328         set_short_version(location);
329
330         /*
331          * All seems well, create the symlink
332          */
333         linkloc = (char *) palloc(10 + 10 + 1);
334         sprintf(linkloc, "pg_tblspc/%u", tablespaceoid);
335
336         if (symlink(location, linkloc) < 0)
337                 ereport(ERROR,
338                                 (errcode_for_file_access(),
339                                  errmsg("could not create symbolic link \"%s\": %m",
340                                                 linkloc)));
341
342         /* Record the filesystem change in XLOG */
343         {
344                 xl_tblspc_create_rec xlrec;
345                 XLogRecData rdata[2];
346
347                 xlrec.ts_id = tablespaceoid;
348                 rdata[0].data = (char *) &xlrec;
349                 rdata[0].len = offsetof(xl_tblspc_create_rec, ts_path);
350                 rdata[0].buffer = InvalidBuffer;
351                 rdata[0].next = &(rdata[1]);
352
353                 rdata[1].data = (char *) location;
354                 rdata[1].len = strlen(location) + 1;
355                 rdata[1].buffer = InvalidBuffer;
356                 rdata[1].next = NULL;
357
358                 (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_CREATE, rdata);
359         }
360
361         /*
362          * Force synchronous commit, to minimize the window between creating the
363          * symlink on-disk and marking the transaction committed.  It's not great
364          * that there is any window at all, but definitely we don't want to make
365          * it larger than necessary.
366          */
367         ForceSyncCommit();
368
369         pfree(linkloc);
370         pfree(location);
371
372         /* We keep the lock on pg_tablespace until commit */
373         heap_close(rel, NoLock);
374 #else                                                   /* !HAVE_SYMLINK */
375         ereport(ERROR,
376                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
377                          errmsg("tablespaces are not supported on this platform")));
378 #endif   /* HAVE_SYMLINK */
379 }
380
381 /*
382  * Drop a table space
383  *
384  * Be careful to check that the tablespace is empty.
385  */
386 void
387 DropTableSpace(DropTableSpaceStmt *stmt)
388 {
389 #ifdef HAVE_SYMLINK
390         char       *tablespacename = stmt->tablespacename;
391         HeapScanDesc scandesc;
392         Relation        rel;
393         HeapTuple       tuple;
394         ScanKeyData entry[1];
395         Oid                     tablespaceoid;
396
397         /*
398          * Find the target tuple
399          */
400         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
401
402         ScanKeyInit(&entry[0],
403                                 Anum_pg_tablespace_spcname,
404                                 BTEqualStrategyNumber, F_NAMEEQ,
405                                 CStringGetDatum(tablespacename));
406         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
407         tuple = heap_getnext(scandesc, ForwardScanDirection);
408
409         if (!HeapTupleIsValid(tuple))
410         {
411                 if (!stmt->missing_ok)
412                 {
413                         ereport(ERROR,
414                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
415                                          errmsg("tablespace \"%s\" does not exist",
416                                                         tablespacename)));
417                 }
418                 else
419                 {
420                         ereport(NOTICE,
421                                         (errmsg("tablespace \"%s\" does not exist, skipping",
422                                                         tablespacename)));
423                         /* XXX I assume I need one or both of these next two calls */
424                         heap_endscan(scandesc);
425                         heap_close(rel, NoLock);
426                 }
427                 return;
428         }
429
430         tablespaceoid = HeapTupleGetOid(tuple);
431
432         /* Must be tablespace owner */
433         if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
434                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
435                                            tablespacename);
436
437         /* Disallow drop of the standard tablespaces, even by superuser */
438         if (tablespaceoid == GLOBALTABLESPACE_OID ||
439                 tablespaceoid == DEFAULTTABLESPACE_OID)
440                 aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE,
441                                            tablespacename);
442
443         /*
444          * Remove the pg_tablespace tuple (this will roll back if we fail below)
445          */
446         simple_heap_delete(rel, &tuple->t_self);
447
448         heap_endscan(scandesc);
449
450         /*
451          * Remove any comments on this tablespace.
452          */
453         DeleteSharedComments(tablespaceoid, TableSpaceRelationId);
454
455         /*
456          * Remove dependency on owner.
457          */
458         deleteSharedDependencyRecordsFor(TableSpaceRelationId, tablespaceoid);
459
460         /*
461          * Acquire TablespaceCreateLock to ensure that no TablespaceCreateDbspace
462          * is running concurrently.
463          */
464         LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
465
466         /*
467          * Try to remove the physical infrastructure.
468          */
469         if (!remove_tablespace_directories(tablespaceoid, false))
470         {
471                 /*
472                  * Not all files deleted?  However, there can be lingering empty files
473                  * in the directories, left behind by for example DROP TABLE, that
474                  * have been scheduled for deletion at next checkpoint (see comments
475                  * in mdunlink() for details).  We could just delete them immediately,
476                  * but we can't tell them apart from important data files that we
477                  * mustn't delete.  So instead, we force a checkpoint which will clean
478                  * out any lingering files, and try again.
479                  */
480                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
481                 if (!remove_tablespace_directories(tablespaceoid, false))
482                 {
483                         /* Still not empty, the files must be important then */
484                         ereport(ERROR,
485                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
486                                          errmsg("tablespace \"%s\" is not empty",
487                                                         tablespacename)));
488                 }
489         }
490
491         /* Record the filesystem change in XLOG */
492         {
493                 xl_tblspc_drop_rec xlrec;
494                 XLogRecData rdata[1];
495
496                 xlrec.ts_id = tablespaceoid;
497                 rdata[0].data = (char *) &xlrec;
498                 rdata[0].len = sizeof(xl_tblspc_drop_rec);
499                 rdata[0].buffer = InvalidBuffer;
500                 rdata[0].next = NULL;
501
502                 (void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP, rdata);
503         }
504
505         /*
506          * Note: because we checked that the tablespace was empty, there should be
507          * no need to worry about flushing shared buffers or free space map
508          * entries for relations in the tablespace.
509          */
510
511         /*
512          * Force synchronous commit, to minimize the window between removing the
513          * files on-disk and marking the transaction committed.  It's not great
514          * that there is any window at all, but definitely we don't want to make
515          * it larger than necessary.
516          */
517         ForceSyncCommit();
518
519         /*
520          * Allow TablespaceCreateDbspace again.
521          */
522         LWLockRelease(TablespaceCreateLock);
523
524         /* We keep the lock on pg_tablespace until commit */
525         heap_close(rel, NoLock);
526 #else                                                   /* !HAVE_SYMLINK */
527         ereport(ERROR,
528                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
529                          errmsg("tablespaces are not supported on this platform")));
530 #endif   /* HAVE_SYMLINK */
531 }
532
533 /*
534  * remove_tablespace_directories: attempt to remove filesystem infrastructure
535  *
536  * Returns TRUE if successful, FALSE if some subdirectory is not empty
537  *
538  * redo indicates we are redoing a drop from XLOG; okay if nothing there
539  */
540 static bool
541 remove_tablespace_directories(Oid tablespaceoid, bool redo)
542 {
543         char       *location;
544         DIR                *dirdesc;
545         struct dirent *de;
546         char       *subfile;
547         struct stat st;
548
549         location = (char *) palloc(10 + 10 + 1);
550         sprintf(location, "pg_tblspc/%u", tablespaceoid);
551
552         /*
553          * Check if the tablespace still contains any files.  We try to rmdir each
554          * per-database directory we find in it.  rmdir failure implies there are
555          * still files in that subdirectory, so give up.  (We do not have to worry
556          * about undoing any already completed rmdirs, since the next attempt to
557          * use the tablespace from that database will simply recreate the
558          * subdirectory via TablespaceCreateDbspace.)
559          *
560          * Since we hold TablespaceCreateLock, no one else should be creating any
561          * fresh subdirectories in parallel. It is possible that new files are
562          * being created within subdirectories, though, so the rmdir call could
563          * fail.  Worst consequence is a less friendly error message.
564          *
565          * If redo is true then ENOENT is a likely outcome here, and we allow it
566          * to pass without comment.  In normal operation we still allow it, but
567          * with a warning.      This is because even though ProcessUtility disallows
568          * DROP TABLESPACE in a transaction block, it's possible that a previous
569          * DROP failed and rolled back after removing the tablespace directories
570          * and symlink.  We want to allow a new DROP attempt to succeed at
571          * removing the catalog entries, so we should not give a hard error here.
572          */
573         dirdesc = AllocateDir(location);
574         if (dirdesc == NULL)
575         {
576                 if (errno == ENOENT)
577                 {
578                         if (!redo)
579                                 ereport(WARNING,
580                                                 (errcode_for_file_access(),
581                                                  errmsg("could not open directory \"%s\": %m",
582                                                                 location)));
583                         pfree(location);
584                         return true;
585                 }
586                 /* else let ReadDir report the error */
587         }
588
589         while ((de = ReadDir(dirdesc, location)) != NULL)
590         {
591                 /* Note we ignore PG_VERSION for the nonce */
592                 if (strcmp(de->d_name, ".") == 0 ||
593                         strcmp(de->d_name, "..") == 0 ||
594                         strcmp(de->d_name, "PG_VERSION") == 0)
595                         continue;
596
597                 subfile = palloc(strlen(location) + 1 + strlen(de->d_name) + 1);
598                 sprintf(subfile, "%s/%s", location, de->d_name);
599
600                 /* This check is just to deliver a friendlier error message */
601                 if (!directory_is_empty(subfile))
602                 {
603                         FreeDir(dirdesc);
604                         return false;
605                 }
606
607                 /* Do the real deed */
608                 if (rmdir(subfile) < 0)
609                         ereport(ERROR,
610                                         (errcode_for_file_access(),
611                                          errmsg("could not remove directory \"%s\": %m",
612                                                         subfile)));
613
614                 pfree(subfile);
615         }
616
617         FreeDir(dirdesc);
618
619         /*
620          * Okay, try to unlink PG_VERSION (we allow it to not be there, even in
621          * non-REDO case, for robustness).
622          */
623         subfile = palloc(strlen(location) + 11 + 1);
624         sprintf(subfile, "%s/PG_VERSION", location);
625
626         if (unlink(subfile) < 0)
627         {
628                 if (errno != ENOENT)
629                         ereport(ERROR,
630                                         (errcode_for_file_access(),
631                                          errmsg("could not remove file \"%s\": %m",
632                                                         subfile)));
633         }
634
635         pfree(subfile);
636
637         /*
638          * Okay, try to remove the symlink.  We must however deal with the
639          * possibility that it's a directory instead of a symlink --- this could
640          * happen during WAL replay (see TablespaceCreateDbspace), and it is also
641          * the normal case on Windows.
642          */
643         if (lstat(location, &st) == 0 && S_ISDIR(st.st_mode))
644         {
645                 if (rmdir(location) < 0)
646                         ereport(ERROR,
647                                         (errcode_for_file_access(),
648                                          errmsg("could not remove directory \"%s\": %m",
649                                                         location)));
650         }
651         else
652         {
653                 if (unlink(location) < 0)
654                         ereport(ERROR,
655                                         (errcode_for_file_access(),
656                                          errmsg("could not remove symbolic link \"%s\": %m",
657                                                         location)));
658         }
659
660         pfree(location);
661
662         return true;
663 }
664
665 /*
666  * write out the PG_VERSION file in the specified directory
667  */
668 static void
669 set_short_version(const char *path)
670 {
671         char       *short_version;
672         bool            gotdot = false;
673         int                     end;
674         char       *fullname;
675         FILE       *version_file;
676
677         /* Construct short version string (should match initdb.c) */
678         short_version = pstrdup(PG_VERSION);
679
680         for (end = 0; short_version[end] != '\0'; end++)
681         {
682                 if (short_version[end] == '.')
683                 {
684                         Assert(end != 0);
685                         if (gotdot)
686                                 break;
687                         else
688                                 gotdot = true;
689                 }
690                 else if (short_version[end] < '0' || short_version[end] > '9')
691                 {
692                         /* gone past digits and dots */
693                         break;
694                 }
695         }
696         Assert(end > 0 && short_version[end - 1] != '.' && gotdot);
697         short_version[end] = '\0';
698
699         /* Now write the file */
700         fullname = palloc(strlen(path) + 11 + 1);
701         sprintf(fullname, "%s/PG_VERSION", path);
702         version_file = AllocateFile(fullname, PG_BINARY_W);
703         if (version_file == NULL)
704                 ereport(ERROR,
705                                 (errcode_for_file_access(),
706                                  errmsg("could not write to file \"%s\": %m",
707                                                 fullname)));
708         fprintf(version_file, "%s\n", short_version);
709         if (FreeFile(version_file))
710                 ereport(ERROR,
711                                 (errcode_for_file_access(),
712                                  errmsg("could not write to file \"%s\": %m",
713                                                 fullname)));
714
715         pfree(fullname);
716         pfree(short_version);
717 }
718
719 /*
720  * Check if a directory is empty.
721  *
722  * This probably belongs somewhere else, but not sure where...
723  */
724 bool
725 directory_is_empty(const char *path)
726 {
727         DIR                *dirdesc;
728         struct dirent *de;
729
730         dirdesc = AllocateDir(path);
731
732         while ((de = ReadDir(dirdesc, path)) != NULL)
733         {
734                 if (strcmp(de->d_name, ".") == 0 ||
735                         strcmp(de->d_name, "..") == 0)
736                         continue;
737                 FreeDir(dirdesc);
738                 return false;
739         }
740
741         FreeDir(dirdesc);
742         return true;
743 }
744
745 /*
746  * Rename a tablespace
747  */
748 void
749 RenameTableSpace(const char *oldname, const char *newname)
750 {
751         Relation        rel;
752         ScanKeyData entry[1];
753         HeapScanDesc scan;
754         HeapTuple       tup;
755         HeapTuple       newtuple;
756         Form_pg_tablespace newform;
757
758         /* Search pg_tablespace */
759         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
760
761         ScanKeyInit(&entry[0],
762                                 Anum_pg_tablespace_spcname,
763                                 BTEqualStrategyNumber, F_NAMEEQ,
764                                 CStringGetDatum(oldname));
765         scan = heap_beginscan(rel, SnapshotNow, 1, entry);
766         tup = heap_getnext(scan, ForwardScanDirection);
767         if (!HeapTupleIsValid(tup))
768                 ereport(ERROR,
769                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
770                                  errmsg("tablespace \"%s\" does not exist",
771                                                 oldname)));
772
773         newtuple = heap_copytuple(tup);
774         newform = (Form_pg_tablespace) GETSTRUCT(newtuple);
775
776         heap_endscan(scan);
777
778         /* Must be owner */
779         if (!pg_tablespace_ownercheck(HeapTupleGetOid(newtuple), GetUserId()))
780                 aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE, oldname);
781
782         /* Validate new name */
783         if (!allowSystemTableMods && IsReservedName(newname))
784                 ereport(ERROR,
785                                 (errcode(ERRCODE_RESERVED_NAME),
786                                  errmsg("unacceptable tablespace name \"%s\"", newname),
787                 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
788
789         /* Make sure the new name doesn't exist */
790         ScanKeyInit(&entry[0],
791                                 Anum_pg_tablespace_spcname,
792                                 BTEqualStrategyNumber, F_NAMEEQ,
793                                 CStringGetDatum(newname));
794         scan = heap_beginscan(rel, SnapshotNow, 1, entry);
795         tup = heap_getnext(scan, ForwardScanDirection);
796         if (HeapTupleIsValid(tup))
797                 ereport(ERROR,
798                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
799                                  errmsg("tablespace \"%s\" already exists",
800                                                 newname)));
801
802         heap_endscan(scan);
803
804         /* OK, update the entry */
805         namestrcpy(&(newform->spcname), newname);
806
807         simple_heap_update(rel, &newtuple->t_self, newtuple);
808         CatalogUpdateIndexes(rel, newtuple);
809
810         heap_close(rel, NoLock);
811 }
812
813 /*
814  * Change tablespace owner
815  */
816 void
817 AlterTableSpaceOwner(const char *name, Oid newOwnerId)
818 {
819         Relation        rel;
820         ScanKeyData entry[1];
821         HeapScanDesc scandesc;
822         Form_pg_tablespace spcForm;
823         HeapTuple       tup;
824
825         /* Search pg_tablespace */
826         rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
827
828         ScanKeyInit(&entry[0],
829                                 Anum_pg_tablespace_spcname,
830                                 BTEqualStrategyNumber, F_NAMEEQ,
831                                 CStringGetDatum(name));
832         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
833         tup = heap_getnext(scandesc, ForwardScanDirection);
834         if (!HeapTupleIsValid(tup))
835                 ereport(ERROR,
836                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
837                                  errmsg("tablespace \"%s\" does not exist", name)));
838
839         spcForm = (Form_pg_tablespace) GETSTRUCT(tup);
840
841         /*
842          * If the new owner is the same as the existing owner, consider the
843          * command to have succeeded.  This is for dump restoration purposes.
844          */
845         if (spcForm->spcowner != newOwnerId)
846         {
847                 Datum           repl_val[Natts_pg_tablespace];
848                 bool            repl_null[Natts_pg_tablespace];
849                 bool            repl_repl[Natts_pg_tablespace];
850                 Acl                *newAcl;
851                 Datum           aclDatum;
852                 bool            isNull;
853                 HeapTuple       newtuple;
854
855                 /* Otherwise, must be owner of the existing object */
856                 if (!pg_tablespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
857                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
858                                                    name);
859
860                 /* Must be able to become new owner */
861                 check_is_member_of_role(GetUserId(), newOwnerId);
862
863                 /*
864                  * Normally we would also check for create permissions here, but there
865                  * are none for tablespaces so we follow what rename tablespace does
866                  * and omit the create permissions check.
867                  *
868                  * NOTE: Only superusers may create tablespaces to begin with and so
869                  * initially only a superuser would be able to change its ownership
870                  * anyway.
871                  */
872
873                 memset(repl_null, false, sizeof(repl_null));
874                 memset(repl_repl, false, sizeof(repl_repl));
875
876                 repl_repl[Anum_pg_tablespace_spcowner - 1] = true;
877                 repl_val[Anum_pg_tablespace_spcowner - 1] = ObjectIdGetDatum(newOwnerId);
878
879                 /*
880                  * Determine the modified ACL for the new owner.  This is only
881                  * necessary when the ACL is non-null.
882                  */
883                 aclDatum = heap_getattr(tup,
884                                                                 Anum_pg_tablespace_spcacl,
885                                                                 RelationGetDescr(rel),
886                                                                 &isNull);
887                 if (!isNull)
888                 {
889                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
890                                                                  spcForm->spcowner, newOwnerId);
891                         repl_repl[Anum_pg_tablespace_spcacl - 1] = true;
892                         repl_val[Anum_pg_tablespace_spcacl - 1] = PointerGetDatum(newAcl);
893                 }
894
895                 newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
896
897                 simple_heap_update(rel, &newtuple->t_self, newtuple);
898                 CatalogUpdateIndexes(rel, newtuple);
899
900                 heap_freetuple(newtuple);
901
902                 /* Update owner dependency reference */
903                 changeDependencyOnOwner(TableSpaceRelationId, HeapTupleGetOid(tup),
904                                                                 newOwnerId);
905         }
906
907         heap_endscan(scandesc);
908         heap_close(rel, NoLock);
909 }
910
911
912 /*
913  * Routines for handling the GUC variable 'default_tablespace'.
914  */
915
916 /* assign_hook: validate new default_tablespace, do extra actions as needed */
917 const char *
918 assign_default_tablespace(const char *newval, bool doit, GucSource source)
919 {
920         /*
921          * If we aren't inside a transaction, we cannot do database access so
922          * cannot verify the name.      Must accept the value on faith.
923          */
924         if (IsTransactionState())
925         {
926                 if (newval[0] != '\0' &&
927                         !OidIsValid(get_tablespace_oid(newval)))
928                 {
929                         ereport(GUC_complaint_elevel(source),
930                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
931                                          errmsg("tablespace \"%s\" does not exist",
932                                                         newval)));
933                         return NULL;
934                 }
935         }
936
937         return newval;
938 }
939
940 /*
941  * GetDefaultTablespace -- get the OID of the current default tablespace
942  *
943  * Regular objects and temporary objects have different default tablespaces,
944  * hence the forTemp parameter must be specified.
945  *
946  * May return InvalidOid to indicate "use the database's default tablespace".
947  *
948  * Note that caller is expected to check appropriate permissions for any
949  * result other than InvalidOid.
950  *
951  * This exists to hide (and possibly optimize the use of) the
952  * default_tablespace GUC variable.
953  */
954 Oid
955 GetDefaultTablespace(bool forTemp)
956 {
957         Oid                     result;
958
959         /* The temp-table case is handled elsewhere */
960         if (forTemp)
961         {
962                 PrepareTempTablespaces();
963                 return GetNextTempTableSpace();
964         }
965
966         /* Fast path for default_tablespace == "" */
967         if (default_tablespace == NULL || default_tablespace[0] == '\0')
968                 return InvalidOid;
969
970         /*
971          * It is tempting to cache this lookup for more speed, but then we would
972          * fail to detect the case where the tablespace was dropped since the GUC
973          * variable was set.  Note also that we don't complain if the value fails
974          * to refer to an existing tablespace; we just silently return InvalidOid,
975          * causing the new object to be created in the database's tablespace.
976          */
977         result = get_tablespace_oid(default_tablespace);
978
979         /*
980          * Allow explicit specification of database's default tablespace in
981          * default_tablespace without triggering permissions checks.
982          */
983         if (result == MyDatabaseTableSpace)
984                 result = InvalidOid;
985         return result;
986 }
987
988
989 /*
990  * Routines for handling the GUC variable 'temp_tablespaces'.
991  */
992
993 /* assign_hook: validate new temp_tablespaces, do extra actions as needed */
994 const char *
995 assign_temp_tablespaces(const char *newval, bool doit, GucSource source)
996 {
997         char       *rawname;
998         List       *namelist;
999
1000         /* Need a modifiable copy of string */
1001         rawname = pstrdup(newval);
1002
1003         /* Parse string into list of identifiers */
1004         if (!SplitIdentifierString(rawname, ',', &namelist))
1005         {
1006                 /* syntax error in name list */
1007                 pfree(rawname);
1008                 list_free(namelist);
1009                 return NULL;
1010         }
1011
1012         /*
1013          * If we aren't inside a transaction, we cannot do database access so
1014          * cannot verify the individual names.  Must accept the list on faith.
1015          * Fortunately, there's then also no need to pass the data to fd.c.
1016          */
1017         if (IsTransactionState())
1018         {
1019                 /*
1020                  * If we error out below, or if we are called multiple times in one
1021                  * transaction, we'll leak a bit of TopTransactionContext memory.
1022                  * Doesn't seem worth worrying about.
1023                  */
1024                 Oid                *tblSpcs;
1025                 int                     numSpcs;
1026                 ListCell   *l;
1027
1028                 tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1029                                                                                 list_length(namelist) * sizeof(Oid));
1030                 numSpcs = 0;
1031                 foreach(l, namelist)
1032                 {
1033                         char       *curname = (char *) lfirst(l);
1034                         Oid                     curoid;
1035                         AclResult       aclresult;
1036
1037                         /* Allow an empty string (signifying database default) */
1038                         if (curname[0] == '\0')
1039                         {
1040                                 tblSpcs[numSpcs++] = InvalidOid;
1041                                 continue;
1042                         }
1043
1044                         /* Else verify that name is a valid tablespace name */
1045                         curoid = get_tablespace_oid(curname);
1046                         if (curoid == InvalidOid)
1047                         {
1048                                 /*
1049                                  * In an interactive SET command, we ereport for bad info.
1050                                  * Otherwise, silently ignore any bad list elements.
1051                                  */
1052                                 if (source >= PGC_S_INTERACTIVE)
1053                                         ereport(ERROR,
1054                                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
1055                                                          errmsg("tablespace \"%s\" does not exist",
1056                                                                         curname)));
1057                                 continue;
1058                         }
1059
1060                         /*
1061                          * Allow explicit specification of database's default tablespace
1062                          * in temp_tablespaces without triggering permissions checks.
1063                          */
1064                         if (curoid == MyDatabaseTableSpace)
1065                         {
1066                                 tblSpcs[numSpcs++] = InvalidOid;
1067                                 continue;
1068                         }
1069
1070                         /* Check permissions similarly */
1071                         aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1072                                                                                            ACL_CREATE);
1073                         if (aclresult != ACLCHECK_OK)
1074                         {
1075                                 if (source >= PGC_S_INTERACTIVE)
1076                                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE, curname);
1077                                 continue;
1078                         }
1079
1080                         tblSpcs[numSpcs++] = curoid;
1081                 }
1082
1083                 /* If actively "doing it", give the new list to fd.c */
1084                 if (doit)
1085                         SetTempTablespaces(tblSpcs, numSpcs);
1086                 else
1087                         pfree(tblSpcs);
1088         }
1089
1090         pfree(rawname);
1091         list_free(namelist);
1092
1093         return newval;
1094 }
1095
1096 /*
1097  * PrepareTempTablespaces -- prepare to use temp tablespaces
1098  *
1099  * If we have not already done so in the current transaction, parse the
1100  * temp_tablespaces GUC variable and tell fd.c which tablespace(s) to use
1101  * for temp files.
1102  */
1103 void
1104 PrepareTempTablespaces(void)
1105 {
1106         char       *rawname;
1107         List       *namelist;
1108         Oid                *tblSpcs;
1109         int                     numSpcs;
1110         ListCell   *l;
1111
1112         /* No work if already done in current transaction */
1113         if (TempTablespacesAreSet())
1114                 return;
1115
1116         /*
1117          * Can't do catalog access unless within a transaction.  This is just a
1118          * safety check in case this function is called by low-level code that
1119          * could conceivably execute outside a transaction.  Note that in such a
1120          * scenario, fd.c will fall back to using the current database's default
1121          * tablespace, which should always be OK.
1122          */
1123         if (!IsTransactionState())
1124                 return;
1125
1126         /* Need a modifiable copy of string */
1127         rawname = pstrdup(temp_tablespaces);
1128
1129         /* Parse string into list of identifiers */
1130         if (!SplitIdentifierString(rawname, ',', &namelist))
1131         {
1132                 /* syntax error in name list */
1133                 SetTempTablespaces(NULL, 0);
1134                 pfree(rawname);
1135                 list_free(namelist);
1136                 return;
1137         }
1138
1139         /* Store tablespace OIDs in an array in TopTransactionContext */
1140         tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1141                                                                                  list_length(namelist) * sizeof(Oid));
1142         numSpcs = 0;
1143         foreach(l, namelist)
1144         {
1145                 char       *curname = (char *) lfirst(l);
1146                 Oid                     curoid;
1147                 AclResult       aclresult;
1148
1149                 /* Allow an empty string (signifying database default) */
1150                 if (curname[0] == '\0')
1151                 {
1152                         tblSpcs[numSpcs++] = InvalidOid;
1153                         continue;
1154                 }
1155
1156                 /* Else verify that name is a valid tablespace name */
1157                 curoid = get_tablespace_oid(curname);
1158                 if (curoid == InvalidOid)
1159                 {
1160                         /* Silently ignore any bad list elements */
1161                         continue;
1162                 }
1163
1164                 /*
1165                  * Allow explicit specification of database's default tablespace in
1166                  * temp_tablespaces without triggering permissions checks.
1167                  */
1168                 if (curoid == MyDatabaseTableSpace)
1169                 {
1170                         tblSpcs[numSpcs++] = InvalidOid;
1171                         continue;
1172                 }
1173
1174                 /* Check permissions similarly */
1175                 aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1176                                                                                    ACL_CREATE);
1177                 if (aclresult != ACLCHECK_OK)
1178                         continue;
1179
1180                 tblSpcs[numSpcs++] = curoid;
1181         }
1182
1183         SetTempTablespaces(tblSpcs, numSpcs);
1184
1185         pfree(rawname);
1186         list_free(namelist);
1187 }
1188
1189
1190 /*
1191  * get_tablespace_oid - given a tablespace name, look up the OID
1192  *
1193  * Returns InvalidOid if tablespace name not found.
1194  */
1195 Oid
1196 get_tablespace_oid(const char *tablespacename)
1197 {
1198         Oid                     result;
1199         Relation        rel;
1200         HeapScanDesc scandesc;
1201         HeapTuple       tuple;
1202         ScanKeyData entry[1];
1203
1204         /*
1205          * Search pg_tablespace.  We use a heapscan here even though there is an
1206          * index on name, on the theory that pg_tablespace will usually have just
1207          * a few entries and so an indexed lookup is a waste of effort.
1208          */
1209         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1210
1211         ScanKeyInit(&entry[0],
1212                                 Anum_pg_tablespace_spcname,
1213                                 BTEqualStrategyNumber, F_NAMEEQ,
1214                                 CStringGetDatum(tablespacename));
1215         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
1216         tuple = heap_getnext(scandesc, ForwardScanDirection);
1217
1218         /* We assume that there can be at most one matching tuple */
1219         if (HeapTupleIsValid(tuple))
1220                 result = HeapTupleGetOid(tuple);
1221         else
1222                 result = InvalidOid;
1223
1224         heap_endscan(scandesc);
1225         heap_close(rel, AccessShareLock);
1226
1227         return result;
1228 }
1229
1230 /*
1231  * get_tablespace_name - given a tablespace OID, look up the name
1232  *
1233  * Returns a palloc'd string, or NULL if no such tablespace.
1234  */
1235 char *
1236 get_tablespace_name(Oid spc_oid)
1237 {
1238         char       *result;
1239         Relation        rel;
1240         HeapScanDesc scandesc;
1241         HeapTuple       tuple;
1242         ScanKeyData entry[1];
1243
1244         /*
1245          * Search pg_tablespace.  We use a heapscan here even though there is an
1246          * index on oid, on the theory that pg_tablespace will usually have just a
1247          * few entries and so an indexed lookup is a waste of effort.
1248          */
1249         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1250
1251         ScanKeyInit(&entry[0],
1252                                 ObjectIdAttributeNumber,
1253                                 BTEqualStrategyNumber, F_OIDEQ,
1254                                 ObjectIdGetDatum(spc_oid));
1255         scandesc = heap_beginscan(rel, SnapshotNow, 1, entry);
1256         tuple = heap_getnext(scandesc, ForwardScanDirection);
1257
1258         /* We assume that there can be at most one matching tuple */
1259         if (HeapTupleIsValid(tuple))
1260                 result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname));
1261         else
1262                 result = NULL;
1263
1264         heap_endscan(scandesc);
1265         heap_close(rel, AccessShareLock);
1266
1267         return result;
1268 }
1269
1270
1271 /*
1272  * TABLESPACE resource manager's routines
1273  */
1274 void
1275 tblspc_redo(XLogRecPtr lsn, XLogRecord *record)
1276 {
1277         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1278
1279         if (info == XLOG_TBLSPC_CREATE)
1280         {
1281                 xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
1282                 char       *location = xlrec->ts_path;
1283                 char       *linkloc;
1284
1285                 /*
1286                  * Attempt to coerce target directory to safe permissions.      If this
1287                  * fails, it doesn't exist or has the wrong owner.
1288                  */
1289                 if (chmod(location, 0700) != 0)
1290                         ereport(ERROR,
1291                                         (errcode_for_file_access(),
1292                                   errmsg("could not set permissions on directory \"%s\": %m",
1293                                                  location)));
1294
1295                 /* Create or re-create the PG_VERSION file in the target directory */
1296                 set_short_version(location);
1297
1298                 /* Create the symlink if not already present */
1299                 linkloc = (char *) palloc(10 + 10 + 1);
1300                 sprintf(linkloc, "pg_tblspc/%u", xlrec->ts_id);
1301
1302                 if (symlink(location, linkloc) < 0)
1303                 {
1304                         if (errno != EEXIST)
1305                                 ereport(ERROR,
1306                                                 (errcode_for_file_access(),
1307                                                  errmsg("could not create symbolic link \"%s\": %m",
1308                                                                 linkloc)));
1309                 }
1310
1311                 pfree(linkloc);
1312         }
1313         else if (info == XLOG_TBLSPC_DROP)
1314         {
1315                 xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);
1316
1317                 if (!remove_tablespace_directories(xlrec->ts_id, true))
1318                         ereport(ERROR,
1319                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1320                                          errmsg("tablespace %u is not empty",
1321                                                         xlrec->ts_id)));
1322         }
1323         else
1324                 elog(PANIC, "tblspc_redo: unknown op code %u", info);
1325 }
1326
1327 void
1328 tblspc_desc(StringInfo buf, uint8 xl_info, char *rec)
1329 {
1330         uint8           info = xl_info & ~XLR_INFO_MASK;
1331
1332         if (info == XLOG_TBLSPC_CREATE)
1333         {
1334                 xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) rec;
1335
1336                 appendStringInfo(buf, "create ts: %u \"%s\"",
1337                                                  xlrec->ts_id, xlrec->ts_path);
1338         }
1339         else if (info == XLOG_TBLSPC_DROP)
1340         {
1341                 xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) rec;
1342
1343                 appendStringInfo(buf, "drop ts: %u", xlrec->ts_id);
1344         }
1345         else
1346                 appendStringInfo(buf, "UNKNOWN");
1347 }