OSDN Git Service

pgindent run.
[pg-rex/syncrep.git] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  *
7  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *        $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.105 2002/09/04 20:31:15 momjian Exp $
13  *
14  *-------------------------------------------------------------------------
15  */
16 #include "postgres.h"
17
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <unistd.h>
21 #include <sys/stat.h>
22
23 #include "access/heapam.h"
24 #include "catalog/catname.h"
25 #include "catalog/catalog.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_shadow.h"
28 #include "catalog/indexing.h"
29 #include "commands/comment.h"
30 #include "commands/dbcommands.h"
31 #include "miscadmin.h"
32 #include "storage/freespace.h"
33 #include "storage/sinval.h"
34 #include "utils/array.h"
35 #include "utils/builtins.h"
36 #include "utils/fmgroids.h"
37 #include "utils/guc.h"
38 #include "utils/lsyscache.h"
39 #include "utils/syscache.h"
40
41 #include "mb/pg_wchar.h"                /* encoding check */
42
43
44 /* non-export function prototypes */
45 static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
46                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
47                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
48                         char *dbpath);
49 static bool have_createdb_privilege(void);
50 static char *resolve_alt_dbpath(const char *dbpath, Oid dboid);
51 static bool remove_dbdirs(const char *real_loc, const char *altloc);
52
53 /*
54  * CREATE DATABASE
55  */
56
57 void
58 createdb(const CreatedbStmt *stmt)
59 {
60         char       *nominal_loc;
61         char       *alt_loc;
62         char       *target_dir;
63         char            src_loc[MAXPGPATH];
64         char            buf[2 * MAXPGPATH + 100];
65         Oid                     src_dboid;
66         int4            src_owner;
67         int                     src_encoding;
68         bool            src_istemplate;
69         Oid                     src_lastsysoid;
70         TransactionId src_vacuumxid;
71         TransactionId src_frozenxid;
72         char            src_dbpath[MAXPGPATH];
73         Relation        pg_database_rel;
74         HeapTuple       tuple;
75         TupleDesc       pg_database_dsc;
76         Datum           new_record[Natts_pg_database];
77         char            new_record_nulls[Natts_pg_database];
78         Oid                     dboid;
79         int32           datdba;
80         List       *option;
81         DefElem    *downer = NULL;
82         DefElem    *dpath = NULL;
83         DefElem    *dtemplate = NULL;
84         DefElem    *dencoding = NULL;
85         char       *dbname = stmt->dbname;
86         char       *dbowner = NULL;
87         char       *dbpath = NULL;
88         char       *dbtemplate = NULL;
89         int                     encoding = -1;
90
91         /* Extract options from the statement node tree */
92         foreach(option, stmt->options)
93         {
94                 DefElem    *defel = (DefElem *) lfirst(option);
95
96                 if (strcmp(defel->defname, "owner") == 0)
97                 {
98                         if (downer)
99                                 elog(ERROR, "CREATE DATABASE: conflicting options");
100                         downer = defel;
101                 }
102                 else if (strcmp(defel->defname, "location") == 0)
103                 {
104                         if (dpath)
105                                 elog(ERROR, "CREATE DATABASE: conflicting options");
106                         dpath = defel;
107                 }
108                 else if (strcmp(defel->defname, "template") == 0)
109                 {
110                         if (dtemplate)
111                                 elog(ERROR, "CREATE DATABASE: conflicting options");
112                         dtemplate = defel;
113                 }
114                 else if (strcmp(defel->defname, "encoding") == 0)
115                 {
116                         if (dencoding)
117                                 elog(ERROR, "CREATE DATABASE: conflicting options");
118                         dencoding = defel;
119                 }
120                 else
121                         elog(ERROR, "CREATE DATABASE: option \"%s\" not recognized",
122                                  defel->defname);
123         }
124
125         if (downer)
126                 dbowner = strVal(downer->arg);
127         if (dpath)
128                 dbpath = strVal(dpath->arg);
129         if (dtemplate)
130                 dbtemplate = strVal(dtemplate->arg);
131         if (dencoding)
132                 encoding = intVal(dencoding->arg);
133
134         /* obtain sysid of proposed owner */
135         if (dbowner)
136                 datdba = get_usesysid(dbowner); /* will elog if no such user */
137         else
138                 datdba = GetUserId();
139
140         if (datdba == (int32) GetUserId())
141         {
142                 /* creating database for self: can be superuser or createdb */
143                 if (!superuser() && !have_createdb_privilege())
144                         elog(ERROR, "CREATE DATABASE: permission denied");
145         }
146         else
147         {
148                 /* creating database for someone else: must be superuser */
149                 /* note that the someone else need not have any permissions */
150                 if (!superuser())
151                         elog(ERROR, "CREATE DATABASE: permission denied");
152         }
153
154         /* don't call this in a transaction block */
155         if (IsTransactionBlock())
156                 elog(ERROR, "CREATE DATABASE: may not be called in a transaction block");
157
158         /*
159          * Check for db name conflict.  There is a race condition here, since
160          * another backend could create the same DB name before we commit.
161          * However, holding an exclusive lock on pg_database for the whole
162          * time we are copying the source database doesn't seem like a good
163          * idea, so accept possibility of race to create.  We will check again
164          * after we grab the exclusive lock.
165          */
166         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
167                 elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
168
169         /*
170          * Lookup database (template) to be cloned.
171          */
172         if (!dbtemplate)
173                 dbtemplate = "template1";               /* Default template database name */
174
175         if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding,
176                                          &src_istemplate, &src_lastsysoid,
177                                          &src_vacuumxid, &src_frozenxid,
178                                          src_dbpath))
179                 elog(ERROR, "CREATE DATABASE: template \"%s\" does not exist",
180                          dbtemplate);
181
182         /*
183          * Permission check: to copy a DB that's not marked datistemplate, you
184          * must be superuser or the owner thereof.
185          */
186         if (!src_istemplate)
187         {
188                 if (!superuser() && GetUserId() != src_owner)
189                         elog(ERROR, "CREATE DATABASE: permission to copy \"%s\" denied",
190                                  dbtemplate);
191         }
192
193         /*
194          * Determine physical path of source database
195          */
196         alt_loc = resolve_alt_dbpath(src_dbpath, src_dboid);
197         if (!alt_loc)
198                 alt_loc = GetDatabasePath(src_dboid);
199         strcpy(src_loc, alt_loc);
200
201         /*
202          * The source DB can't have any active backends, except this one
203          * (exception is to allow CREATE DB while connected to template1).
204          * Otherwise we might copy inconsistent data.  This check is not
205          * bulletproof, since someone might connect while we are copying...
206          */
207         if (DatabaseHasActiveBackends(src_dboid, true))
208                 elog(ERROR, "CREATE DATABASE: source database \"%s\" is being accessed by other users", dbtemplate);
209
210         /* If encoding is defaulted, use source's encoding */
211         if (encoding < 0)
212                 encoding = src_encoding;
213
214         /* Some encodings are client only */
215         if (!PG_VALID_BE_ENCODING(encoding))
216                 elog(ERROR, "CREATE DATABASE: invalid backend encoding");
217
218         /*
219          * Preassign OID for pg_database tuple, so that we can compute db
220          * path.
221          */
222         dboid = newoid();
223
224         /*
225          * Compute nominal location (where we will try to access the
226          * database), and resolve alternate physical location if one is
227          * specified.
228          *
229          * If an alternate location is specified but is the same as the normal
230          * path, just drop the alternate-location spec (this seems friendlier
231          * than erroring out).  We must test this case to avoid creating a
232          * circular symlink below.
233          */
234         nominal_loc = GetDatabasePath(dboid);
235         alt_loc = resolve_alt_dbpath(dbpath, dboid);
236
237         if (alt_loc && strcmp(alt_loc, nominal_loc) == 0)
238         {
239                 alt_loc = NULL;
240                 dbpath = NULL;
241         }
242
243         if (strchr(nominal_loc, '\''))
244                 elog(ERROR, "database path may not contain single quotes");
245         if (alt_loc && strchr(alt_loc, '\''))
246                 elog(ERROR, "database path may not contain single quotes");
247         if (strchr(src_loc, '\''))
248                 elog(ERROR, "database path may not contain single quotes");
249         /* ... otherwise we'd be open to shell exploits below */
250
251         /*
252          * Force dirty buffers out to disk, to ensure source database is
253          * up-to-date for the copy.  (We really only need to flush buffers for
254          * the source database...)
255          */
256         BufferSync();
257
258         /*
259          * Close virtual file descriptors so the kernel has more available for
260          * the mkdir() and system() calls below.
261          */
262         closeAllVfds();
263
264         /*
265          * Check we can create the target directory --- but then remove it
266          * because we rely on cp(1) to create it for real.
267          */
268         target_dir = alt_loc ? alt_loc : nominal_loc;
269
270         if (mkdir(target_dir, S_IRWXU) != 0)
271                 elog(ERROR, "CREATE DATABASE: unable to create database directory '%s': %m",
272                          target_dir);
273         if (rmdir(target_dir) != 0)
274                 elog(ERROR, "CREATE DATABASE: unable to remove temp directory '%s': %m",
275                          target_dir);
276
277         /* Make the symlink, if needed */
278         if (alt_loc)
279         {
280                 if (symlink(alt_loc, nominal_loc) != 0)
281                         elog(ERROR, "CREATE DATABASE: could not link '%s' to '%s': %m",
282                                  nominal_loc, alt_loc);
283         }
284
285         /* Copy the template database to the new location */
286         snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir);
287
288         if (system(buf) != 0)
289         {
290                 if (remove_dbdirs(nominal_loc, alt_loc))
291                         elog(ERROR, "CREATE DATABASE: could not initialize database directory");
292                 else
293                         elog(ERROR, "CREATE DATABASE: could not initialize database directory; delete failed as well");
294         }
295
296         /*
297          * Now OK to grab exclusive lock on pg_database.
298          */
299         pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
300
301         /* Check to see if someone else created same DB name meanwhile. */
302         if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
303         {
304                 /* Don't hold lock while doing recursive remove */
305                 heap_close(pg_database_rel, AccessExclusiveLock);
306                 remove_dbdirs(nominal_loc, alt_loc);
307                 elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname);
308         }
309
310         /*
311          * Insert a new tuple into pg_database
312          */
313         pg_database_dsc = RelationGetDescr(pg_database_rel);
314
315         /* Form tuple */
316         MemSet(new_record, 0, sizeof(new_record));
317         MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
318
319         new_record[Anum_pg_database_datname - 1] =
320                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
321         new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(datdba);
322         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
323         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
324         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
325         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
326         new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid);
327         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
328         /* do not set datpath to null, GetRawDatabaseInfo won't cope */
329         new_record[Anum_pg_database_datpath - 1] =
330                 DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : ""));
331
332         /*
333          * We deliberately set datconfig and datacl to defaults (NULL), rather
334          * than copying them from the template database.  Copying datacl would
335          * be a bad idea when the owner is not the same as the template's
336          * owner. It's more debatable whether datconfig should be copied.
337          */
338         new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
339         new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
340
341         tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls);
342
343         HeapTupleSetOid(tuple, dboid);          /* override heap_insert's OID
344                                                                                  * selection */
345
346         simple_heap_insert(pg_database_rel, tuple);
347
348         /* Update indexes */
349         CatalogUpdateIndexes(pg_database_rel, tuple);
350
351         /* Close pg_database, but keep lock till commit */
352         heap_close(pg_database_rel, NoLock);
353
354         /*
355          * Force dirty buffers out to disk, so that newly-connecting backends
356          * will see the new database in pg_database right away.  (They'll see
357          * an uncommitted tuple, but they don't care; see GetRawDatabaseInfo.)
358          */
359         BufferSync();
360 }
361
362
363 /*
364  * DROP DATABASE
365  */
366 void
367 dropdb(const char *dbname)
368 {
369         int4            db_owner;
370         bool            db_istemplate;
371         Oid                     db_id;
372         char       *alt_loc;
373         char       *nominal_loc;
374         char            dbpath[MAXPGPATH];
375         Relation        pgdbrel;
376         HeapScanDesc pgdbscan;
377         ScanKeyData key;
378         HeapTuple       tup;
379
380         AssertArg(dbname);
381
382         if (strcmp(dbname, DatabaseName) == 0)
383                 elog(ERROR, "DROP DATABASE: cannot be executed on the currently open database");
384
385         if (IsTransactionBlock())
386                 elog(ERROR, "DROP DATABASE: may not be called in a transaction block");
387
388         /*
389          * Obtain exclusive lock on pg_database.  We need this to ensure that
390          * no new backend starts up in the target database while we are
391          * deleting it.  (Actually, a new backend might still manage to start
392          * up, because it will read pg_database without any locking to
393          * discover the database's OID.  But it will detect its error in
394          * ReverifyMyDatabase and shut down before any serious damage is done.
395          * See postinit.c.)
396          */
397         pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock);
398
399         if (!get_db_info(dbname, &db_id, &db_owner, NULL,
400                                          &db_istemplate, NULL, NULL, NULL, dbpath))
401                 elog(ERROR, "DROP DATABASE: database \"%s\" does not exist", dbname);
402
403         if (GetUserId() != db_owner && !superuser())
404                 elog(ERROR, "DROP DATABASE: permission denied");
405
406         /*
407          * Disallow dropping a DB that is marked istemplate.  This is just to
408          * prevent people from accidentally dropping template0 or template1;
409          * they can do so if they're really determined ...
410          */
411         if (db_istemplate)
412                 elog(ERROR, "DROP DATABASE: database is marked as a template");
413
414         nominal_loc = GetDatabasePath(db_id);
415         alt_loc = resolve_alt_dbpath(dbpath, db_id);
416
417         /*
418          * Check for active backends in the target database.
419          */
420         if (DatabaseHasActiveBackends(db_id, false))
421                 elog(ERROR, "DROP DATABASE: database \"%s\" is being accessed by other users", dbname);
422
423         /*
424          * Find the database's tuple by OID (should be unique).
425          */
426         ScanKeyEntryInitialize(&key, 0, ObjectIdAttributeNumber,
427                                                    F_OIDEQ, ObjectIdGetDatum(db_id));
428
429         pgdbscan = heap_beginscan(pgdbrel, SnapshotNow, 1, &key);
430
431         tup = heap_getnext(pgdbscan, ForwardScanDirection);
432         if (!HeapTupleIsValid(tup))
433         {
434                 /*
435                  * This error should never come up since the existence of the
436                  * database is checked earlier
437                  */
438                 elog(ERROR, "DROP DATABASE: Database \"%s\" doesn't exist despite earlier reports to the contrary",
439                          dbname);
440         }
441
442         /* Remove the database's tuple from pg_database */
443         simple_heap_delete(pgdbrel, &tup->t_self);
444
445         heap_endscan(pgdbscan);
446
447         /*
448          * Delete any comments associated with the database
449          *
450          * NOTE: this is probably dead code since any such comments should have
451          * been in that database, not mine.
452          */
453         DeleteComments(db_id, RelationGetRelid(pgdbrel), 0);
454
455         /*
456          * Close pg_database, but keep exclusive lock till commit to ensure
457          * that any new backend scanning pg_database will see the tuple dead.
458          */
459         heap_close(pgdbrel, NoLock);
460
461         /*
462          * Drop pages for this database that are in the shared buffer cache.
463          * This is important to ensure that no remaining backend tries to
464          * write out a dirty buffer to the dead database later...
465          */
466         DropBuffers(db_id);
467
468         /*
469          * Also, clean out any entries in the shared free space map.
470          */
471         FreeSpaceMapForgetDatabase(db_id);
472
473         /*
474          * Remove the database's subdirectory and everything in it.
475          */
476         remove_dbdirs(nominal_loc, alt_loc);
477
478         /*
479          * Force dirty buffers out to disk, so that newly-connecting backends
480          * will see the database tuple marked dead in pg_database right away.
481          * (They'll see an uncommitted deletion, but they don't care; see
482          * GetRawDatabaseInfo.)
483          */
484         BufferSync();
485 }
486
487
488
489 /*
490  * ALTER DATABASE name SET ...
491  */
492 void
493 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
494 {
495         char       *valuestr;
496         HeapTuple       tuple,
497                                 newtuple;
498         Relation        rel;
499         ScanKeyData scankey;
500         HeapScanDesc scan;
501         Datum           repl_val[Natts_pg_database];
502         char            repl_null[Natts_pg_database];
503         char            repl_repl[Natts_pg_database];
504
505         valuestr = flatten_set_variable_args(stmt->variable, stmt->value);
506
507         rel = heap_openr(DatabaseRelationName, RowExclusiveLock);
508         ScanKeyEntryInitialize(&scankey, 0, Anum_pg_database_datname,
509                                                    F_NAMEEQ, NameGetDatum(stmt->dbname));
510         scan = heap_beginscan(rel, SnapshotNow, 1, &scankey);
511         tuple = heap_getnext(scan, ForwardScanDirection);
512         if (!HeapTupleIsValid(tuple))
513                 elog(ERROR, "database \"%s\" does not exist", stmt->dbname);
514
515         if (!(superuser()
516                 || ((Form_pg_database) GETSTRUCT(tuple))->datdba == GetUserId()))
517                 elog(ERROR, "permission denied");
518
519         MemSet(repl_repl, ' ', sizeof(repl_repl));
520         repl_repl[Anum_pg_database_datconfig - 1] = 'r';
521
522         if (strcmp(stmt->variable, "all") == 0 && valuestr == NULL)
523         {
524                 /* RESET ALL */
525                 repl_null[Anum_pg_database_datconfig - 1] = 'n';
526                 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
527         }
528         else
529         {
530                 Datum           datum;
531                 bool            isnull;
532                 ArrayType  *a;
533
534                 repl_null[Anum_pg_database_datconfig - 1] = ' ';
535
536                 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
537                                                          RelationGetDescr(rel), &isnull);
538
539                 a = isnull ? ((ArrayType *) NULL) : DatumGetArrayTypeP(datum);
540
541                 if (valuestr)
542                         a = GUCArrayAdd(a, stmt->variable, valuestr);
543                 else
544                         a = GUCArrayDelete(a, stmt->variable);
545
546                 repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
547         }
548
549         newtuple = heap_modifytuple(tuple, rel, repl_val, repl_null, repl_repl);
550         simple_heap_update(rel, &tuple->t_self, newtuple);
551
552         /* Update indexes */
553         CatalogUpdateIndexes(rel, newtuple);
554
555         heap_endscan(scan);
556         heap_close(rel, RowExclusiveLock);
557 }
558
559
560
561 /*
562  * Helper functions
563  */
564
565 static bool
566 get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP,
567                         int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP,
568                         TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP,
569                         char *dbpath)
570 {
571         Relation        relation;
572         ScanKeyData scanKey;
573         HeapScanDesc scan;
574         HeapTuple       tuple;
575         bool            gottuple;
576
577         AssertArg(name);
578
579         /* Caller may wish to grab a better lock on pg_database beforehand... */
580         relation = heap_openr(DatabaseRelationName, AccessShareLock);
581
582         ScanKeyEntryInitialize(&scanKey, 0, Anum_pg_database_datname,
583                                                    F_NAMEEQ, NameGetDatum(name));
584
585         scan = heap_beginscan(relation, SnapshotNow, 1, &scanKey);
586
587         tuple = heap_getnext(scan, ForwardScanDirection);
588
589         gottuple = HeapTupleIsValid(tuple);
590         if (gottuple)
591         {
592                 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
593
594                 /* oid of the database */
595                 if (dbIdP)
596                         *dbIdP = HeapTupleGetOid(tuple);
597                 /* sysid of the owner */
598                 if (ownerIdP)
599                         *ownerIdP = dbform->datdba;
600                 /* character encoding */
601                 if (encodingP)
602                         *encodingP = dbform->encoding;
603                 /* allowed as template? */
604                 if (dbIsTemplateP)
605                         *dbIsTemplateP = dbform->datistemplate;
606                 /* last system OID used in database */
607                 if (dbLastSysOidP)
608                         *dbLastSysOidP = dbform->datlastsysoid;
609                 /* limit of vacuumed XIDs */
610                 if (dbVacuumXidP)
611                         *dbVacuumXidP = dbform->datvacuumxid;
612                 /* limit of frozen XIDs */
613                 if (dbFrozenXidP)
614                         *dbFrozenXidP = dbform->datfrozenxid;
615                 /* database path (as registered in pg_database) */
616                 if (dbpath)
617                 {
618                         Datum           datum;
619                         bool            isnull;
620
621                         datum = heap_getattr(tuple,
622                                                                  Anum_pg_database_datpath,
623                                                                  RelationGetDescr(relation),
624                                                                  &isnull);
625                         if (!isnull)
626                         {
627                                 text       *pathtext = DatumGetTextP(datum);
628                                 int                     pathlen = VARSIZE(pathtext) - VARHDRSZ;
629
630                                 Assert(pathlen >= 0 && pathlen < MAXPGPATH);
631                                 strncpy(dbpath, VARDATA(pathtext), pathlen);
632                                 *(dbpath + pathlen) = '\0';
633                         }
634                         else
635                                 strcpy(dbpath, "");
636                 }
637         }
638
639         heap_endscan(scan);
640         heap_close(relation, AccessShareLock);
641
642         return gottuple;
643 }
644
645 static bool
646 have_createdb_privilege(void)
647 {
648         HeapTuple       utup;
649         bool            retval;
650
651         utup = SearchSysCache(SHADOWSYSID,
652                                                   ObjectIdGetDatum(GetUserId()),
653                                                   0, 0, 0);
654
655         if (!HeapTupleIsValid(utup))
656                 retval = false;
657         else
658                 retval = ((Form_pg_shadow) GETSTRUCT(utup))->usecreatedb;
659
660         ReleaseSysCache(utup);
661
662         return retval;
663 }
664
665
666 static char *
667 resolve_alt_dbpath(const char *dbpath, Oid dboid)
668 {
669         const char *prefix;
670         char       *ret;
671         size_t          len;
672
673         if (dbpath == NULL || dbpath[0] == '\0')
674                 return NULL;
675
676         if (strchr(dbpath, '/'))
677         {
678                 if (dbpath[0] != '/')
679                         elog(ERROR, "Relative paths are not allowed as database locations");
680 #ifndef ALLOW_ABSOLUTE_DBPATHS
681                 elog(ERROR, "Absolute paths are not allowed as database locations");
682 #endif
683                 prefix = dbpath;
684         }
685         else
686         {
687                 /* must be environment variable */
688                 char       *var = getenv(dbpath);
689
690                 if (!var)
691                         elog(ERROR, "Postmaster environment variable '%s' not set", dbpath);
692                 if (var[0] != '/')
693                         elog(ERROR, "Postmaster environment variable '%s' must be absolute path", dbpath);
694                 prefix = var;
695         }
696
697         len = strlen(prefix) + 6 + sizeof(Oid) * 8 + 1;
698         if (len >= MAXPGPATH - 100)
699                 elog(ERROR, "Alternate path is too long");
700
701         ret = palloc(len);
702         snprintf(ret, len, "%s/base/%u", prefix, dboid);
703
704         return ret;
705 }
706
707
708 static bool
709 remove_dbdirs(const char *nominal_loc, const char *alt_loc)
710 {
711         const char *target_dir;
712         char            buf[MAXPGPATH + 100];
713         bool            success = true;
714
715         target_dir = alt_loc ? alt_loc : nominal_loc;
716
717         /*
718          * Close virtual file descriptors so the kernel has more available for
719          * the system() call below.
720          */
721         closeAllVfds();
722
723         if (alt_loc)
724         {
725                 /* remove symlink */
726                 if (unlink(nominal_loc) != 0)
727                 {
728                         elog(WARNING, "could not remove '%s': %m", nominal_loc);
729                         success = false;
730                 }
731         }
732
733         snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir);
734
735         if (system(buf) != 0)
736         {
737                 elog(WARNING, "database directory '%s' could not be removed",
738                          target_dir);
739                 success = false;
740         }
741
742         return success;
743 }
744
745
746 /*
747  * get_database_oid - given a database name, look up the OID
748  *
749  * Returns InvalidOid if database name not found.
750  *
751  * This is not actually used in this file, but is exported for use elsewhere.
752  */
753 Oid
754 get_database_oid(const char *dbname)
755 {
756         Relation        pg_database;
757         ScanKeyData entry[1];
758         HeapScanDesc scan;
759         HeapTuple       dbtuple;
760         Oid                     oid;
761
762         /* There's no syscache for pg_database, so must look the hard way */
763         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
764         ScanKeyEntryInitialize(&entry[0], 0x0,
765                                                    Anum_pg_database_datname, F_NAMEEQ,
766                                                    CStringGetDatum(dbname));
767         scan = heap_beginscan(pg_database, SnapshotNow, 1, entry);
768
769         dbtuple = heap_getnext(scan, ForwardScanDirection);
770
771         /* We assume that there can be at most one matching tuple */
772         if (HeapTupleIsValid(dbtuple))
773                 oid = HeapTupleGetOid(dbtuple);
774         else
775                 oid = InvalidOid;
776
777         heap_endscan(scan);
778         heap_close(pg_database, AccessShareLock);
779
780         return oid;
781 }
782
783 /*
784  * get_database_owner - given a database OID, fetch the owner's usesysid.
785  *
786  * Errors out if database not found.
787  *
788  * This is not actually used in this file, but is exported for use elsewhere.
789  */
790 Oid
791 get_database_owner(Oid dbid)
792 {
793         Relation        pg_database;
794         ScanKeyData entry[1];
795         HeapScanDesc scan;
796         HeapTuple       dbtuple;
797         int32           dba;
798
799         /* There's no syscache for pg_database, so must look the hard way */
800         pg_database = heap_openr(DatabaseRelationName, AccessShareLock);
801         ScanKeyEntryInitialize(&entry[0], 0x0,
802                                                    ObjectIdAttributeNumber, F_OIDEQ,
803                                                    ObjectIdGetDatum(dbid));
804         scan = heap_beginscan(pg_database, SnapshotNow, 1, entry);
805
806         dbtuple = heap_getnext(scan, ForwardScanDirection);
807
808         if (!HeapTupleIsValid(dbtuple))
809                 elog(ERROR, "database %u does not exist", dbid);
810
811         dba = ((Form_pg_database) GETSTRUCT(dbtuple))->datdba;
812
813         heap_endscan(scan);
814         heap_close(pg_database, AccessShareLock);
815
816         /* XXX some confusion about whether userids are OID or int4 ... */
817         return (Oid) dba;
818 }