OSDN Git Service

d2e38f631f88b103fe4c3384fcef1670942a9117
[pg-rex/syncrep.git] / src / backend / commands / dbcommands.c
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *              Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes.  Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *        $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.230 2010/01/02 16:57:37 momjian Exp $
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21
22 #include <fcntl.h>
23 #include <locale.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
26
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/transam.h"
30 #include "access/xact.h"
31 #include "access/xlogutils.h"
32 #include "catalog/catalog.h"
33 #include "catalog/dependency.h"
34 #include "catalog/indexing.h"
35 #include "catalog/pg_authid.h"
36 #include "catalog/pg_database.h"
37 #include "catalog/pg_db_role_setting.h"
38 #include "catalog/pg_tablespace.h"
39 #include "commands/comment.h"
40 #include "commands/dbcommands.h"
41 #include "commands/tablespace.h"
42 #include "mb/pg_wchar.h"
43 #include "miscadmin.h"
44 #include "pgstat.h"
45 #include "postmaster/bgwriter.h"
46 #include "storage/bufmgr.h"
47 #include "storage/fd.h"
48 #include "storage/lmgr.h"
49 #include "storage/ipc.h"
50 #include "storage/procarray.h"
51 #include "storage/smgr.h"
52 #include "storage/standby.h"
53 #include "utils/acl.h"
54 #include "utils/builtins.h"
55 #include "utils/fmgroids.h"
56 #include "utils/lsyscache.h"
57 #include "utils/pg_locale.h"
58 #include "utils/snapmgr.h"
59 #include "utils/syscache.h"
60 #include "utils/tqual.h"
61
62
63 typedef struct
64 {
65         Oid                     src_dboid;              /* source (template) DB */
66         Oid                     dest_dboid;             /* DB we are trying to create */
67 } createdb_failure_params;
68
69 typedef struct
70 {
71         Oid                     dest_dboid;             /* DB we are trying to move */
72         Oid                     dest_tsoid;             /* tablespace we are trying to move to */
73 } movedb_failure_params;
74
75 /* non-export function prototypes */
76 static void createdb_failure_callback(int code, Datum arg);
77 static void movedb(const char *dbname, const char *tblspcname);
78 static void movedb_failure_callback(int code, Datum arg);
79 static bool get_db_info(const char *name, LOCKMODE lockmode,
80                         Oid *dbIdP, Oid *ownerIdP,
81                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
82                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
83                         Oid *dbTablespace, char **dbCollate, char **dbCtype);
84 static bool have_createdb_privilege(void);
85 static void remove_dbtablespaces(Oid db_id);
86 static bool check_db_file_conflict(Oid db_id);
87 static int      errdetail_busy_db(int notherbackends, int npreparedxacts);
88
89
90 /*
91  * CREATE DATABASE
92  */
93 void
94 createdb(const CreatedbStmt *stmt)
95 {
96         HeapScanDesc scan;
97         Relation        rel;
98         Oid                     src_dboid;
99         Oid                     src_owner;
100         int                     src_encoding;
101         char       *src_collate;
102         char       *src_ctype;
103         bool            src_istemplate;
104         bool            src_allowconn;
105         Oid                     src_lastsysoid;
106         TransactionId src_frozenxid;
107         Oid                     src_deftablespace;
108         volatile Oid dst_deftablespace;
109         Relation        pg_database_rel;
110         HeapTuple       tuple;
111         Datum           new_record[Natts_pg_database];
112         bool            new_record_nulls[Natts_pg_database];
113         Oid                     dboid;
114         Oid                     datdba;
115         ListCell   *option;
116         DefElem    *dtablespacename = NULL;
117         DefElem    *downer = NULL;
118         DefElem    *dtemplate = NULL;
119         DefElem    *dencoding = NULL;
120         DefElem    *dcollate = NULL;
121         DefElem    *dctype = NULL;
122         DefElem    *dconnlimit = NULL;
123         char       *dbname = stmt->dbname;
124         char       *dbowner = NULL;
125         const char *dbtemplate = NULL;
126         char       *dbcollate = NULL;
127         char       *dbctype = NULL;
128         int                     encoding = -1;
129         int                     dbconnlimit = -1;
130         int                     ctype_encoding;
131         int                     collate_encoding;
132         int                     notherbackends;
133         int                     npreparedxacts;
134         createdb_failure_params fparms;
135
136         /* Extract options from the statement node tree */
137         foreach(option, stmt->options)
138         {
139                 DefElem    *defel = (DefElem *) lfirst(option);
140
141                 if (strcmp(defel->defname, "tablespace") == 0)
142                 {
143                         if (dtablespacename)
144                                 ereport(ERROR,
145                                                 (errcode(ERRCODE_SYNTAX_ERROR),
146                                                  errmsg("conflicting or redundant options")));
147                         dtablespacename = defel;
148                 }
149                 else if (strcmp(defel->defname, "owner") == 0)
150                 {
151                         if (downer)
152                                 ereport(ERROR,
153                                                 (errcode(ERRCODE_SYNTAX_ERROR),
154                                                  errmsg("conflicting or redundant options")));
155                         downer = defel;
156                 }
157                 else if (strcmp(defel->defname, "template") == 0)
158                 {
159                         if (dtemplate)
160                                 ereport(ERROR,
161                                                 (errcode(ERRCODE_SYNTAX_ERROR),
162                                                  errmsg("conflicting or redundant options")));
163                         dtemplate = defel;
164                 }
165                 else if (strcmp(defel->defname, "encoding") == 0)
166                 {
167                         if (dencoding)
168                                 ereport(ERROR,
169                                                 (errcode(ERRCODE_SYNTAX_ERROR),
170                                                  errmsg("conflicting or redundant options")));
171                         dencoding = defel;
172                 }
173                 else if (strcmp(defel->defname, "lc_collate") == 0)
174                 {
175                         if (dcollate)
176                                 ereport(ERROR,
177                                                 (errcode(ERRCODE_SYNTAX_ERROR),
178                                                  errmsg("conflicting or redundant options")));
179                         dcollate = defel;
180                 }
181                 else if (strcmp(defel->defname, "lc_ctype") == 0)
182                 {
183                         if (dctype)
184                                 ereport(ERROR,
185                                                 (errcode(ERRCODE_SYNTAX_ERROR),
186                                                  errmsg("conflicting or redundant options")));
187                         dctype = defel;
188                 }
189                 else if (strcmp(defel->defname, "connectionlimit") == 0)
190                 {
191                         if (dconnlimit)
192                                 ereport(ERROR,
193                                                 (errcode(ERRCODE_SYNTAX_ERROR),
194                                                  errmsg("conflicting or redundant options")));
195                         dconnlimit = defel;
196                 }
197                 else if (strcmp(defel->defname, "location") == 0)
198                 {
199                         ereport(WARNING,
200                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
201                                          errmsg("LOCATION is not supported anymore"),
202                                          errhint("Consider using tablespaces instead.")));
203                 }
204                 else
205                         elog(ERROR, "option \"%s\" not recognized",
206                                  defel->defname);
207         }
208
209         if (downer && downer->arg)
210                 dbowner = strVal(downer->arg);
211         if (dtemplate && dtemplate->arg)
212                 dbtemplate = strVal(dtemplate->arg);
213         if (dencoding && dencoding->arg)
214         {
215                 const char *encoding_name;
216
217                 if (IsA(dencoding->arg, Integer))
218                 {
219                         encoding = intVal(dencoding->arg);
220                         encoding_name = pg_encoding_to_char(encoding);
221                         if (strcmp(encoding_name, "") == 0 ||
222                                 pg_valid_server_encoding(encoding_name) < 0)
223                                 ereport(ERROR,
224                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
225                                                  errmsg("%d is not a valid encoding code",
226                                                                 encoding)));
227                 }
228                 else if (IsA(dencoding->arg, String))
229                 {
230                         encoding_name = strVal(dencoding->arg);
231                         encoding = pg_valid_server_encoding(encoding_name);
232                         if (encoding < 0)
233                                 ereport(ERROR,
234                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
235                                                  errmsg("%s is not a valid encoding name",
236                                                                 encoding_name)));
237                 }
238                 else
239                         elog(ERROR, "unrecognized node type: %d",
240                                  nodeTag(dencoding->arg));
241         }
242         if (dcollate && dcollate->arg)
243                 dbcollate = strVal(dcollate->arg);
244         if (dctype && dctype->arg)
245                 dbctype = strVal(dctype->arg);
246
247         if (dconnlimit && dconnlimit->arg)
248         {
249                 dbconnlimit = intVal(dconnlimit->arg);
250                 if (dbconnlimit < -1)
251                         ereport(ERROR,
252                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
253                                          errmsg("invalid connection limit: %d", dbconnlimit)));
254         }
255
256         /* obtain OID of proposed owner */
257         if (dbowner)
258                 datdba = get_roleid_checked(dbowner);
259         else
260                 datdba = GetUserId();
261
262         /*
263          * To create a database, must have createdb privilege and must be able to
264          * become the target role (this does not imply that the target role itself
265          * must have createdb privilege).  The latter provision guards against
266          * "giveaway" attacks.  Note that a superuser will always have both of
267          * these privileges a fortiori.
268          */
269         if (!have_createdb_privilege())
270                 ereport(ERROR,
271                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
272                                  errmsg("permission denied to create database")));
273
274         check_is_member_of_role(GetUserId(), datdba);
275
276         /*
277          * Lookup database (template) to be cloned, and obtain share lock on it.
278          * ShareLock allows two CREATE DATABASEs to work from the same template
279          * concurrently, while ensuring no one is busy dropping it in parallel
280          * (which would be Very Bad since we'd likely get an incomplete copy
281          * without knowing it).  This also prevents any new connections from being
282          * made to the source until we finish copying it, so we can be sure it
283          * won't change underneath us.
284          */
285         if (!dbtemplate)
286                 dbtemplate = "template1";               /* Default template database name */
287
288         if (!get_db_info(dbtemplate, ShareLock,
289                                          &src_dboid, &src_owner, &src_encoding,
290                                          &src_istemplate, &src_allowconn, &src_lastsysoid,
291                                          &src_frozenxid, &src_deftablespace,
292                                          &src_collate, &src_ctype))
293                 ereport(ERROR,
294                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
295                                  errmsg("template database \"%s\" does not exist",
296                                                 dbtemplate)));
297
298         /*
299          * Permission check: to copy a DB that's not marked datistemplate, you
300          * must be superuser or the owner thereof.
301          */
302         if (!src_istemplate)
303         {
304                 if (!pg_database_ownercheck(src_dboid, GetUserId()))
305                         ereport(ERROR,
306                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
307                                          errmsg("permission denied to copy database \"%s\"",
308                                                         dbtemplate)));
309         }
310
311         /* If encoding or locales are defaulted, use source's setting */
312         if (encoding < 0)
313                 encoding = src_encoding;
314         if (dbcollate == NULL)
315                 dbcollate = src_collate;
316         if (dbctype == NULL)
317                 dbctype = src_ctype;
318
319         /* Some encodings are client only */
320         if (!PG_VALID_BE_ENCODING(encoding))
321                 ereport(ERROR,
322                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
323                                  errmsg("invalid server encoding %d", encoding)));
324
325         /* Check that the chosen locales are valid */
326         if (!check_locale(LC_COLLATE, dbcollate))
327                 ereport(ERROR,
328                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
329                                  errmsg("invalid locale name %s", dbcollate)));
330         if (!check_locale(LC_CTYPE, dbctype))
331                 ereport(ERROR,
332                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
333                                  errmsg("invalid locale name %s", dbctype)));
334
335         /*
336          * Check whether chosen encoding matches chosen locale settings.  This
337          * restriction is necessary because libc's locale-specific code usually
338          * fails when presented with data in an encoding it's not expecting. We
339          * allow mismatch in four cases:
340          *
341          * 1. locale encoding = SQL_ASCII, which means that the locale is
342          * C/POSIX which works with any encoding.
343          *
344          * 2. locale encoding = -1, which means that we couldn't determine
345          * the locale's encoding and have to trust the user to get it right.
346          *
347          * 3. selected encoding is UTF8 and platform is win32. This is because
348          * UTF8 is a pseudo codepage that is supported in all locales since it's
349          * converted to UTF16 before being used.
350          *
351          * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
352          * is risky but we have historically allowed it --- notably, the
353          * regression tests require it.
354          *
355          * Note: if you change this policy, fix initdb to match.
356          */
357         ctype_encoding = pg_get_encoding_from_locale(dbctype);
358         collate_encoding = pg_get_encoding_from_locale(dbcollate);
359
360         if (!(ctype_encoding == encoding ||
361                   ctype_encoding == PG_SQL_ASCII ||
362                   ctype_encoding == -1 ||
363 #ifdef WIN32
364                   encoding == PG_UTF8 ||
365 #endif
366                   (encoding == PG_SQL_ASCII && superuser())))
367                 ereport(ERROR,
368                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
369                                  errmsg("encoding %s does not match locale %s",
370                                                 pg_encoding_to_char(encoding),
371                                                 dbctype),
372                            errdetail("The chosen LC_CTYPE setting requires encoding %s.",
373                                                  pg_encoding_to_char(ctype_encoding))));
374
375         if (!(collate_encoding == encoding ||
376                   collate_encoding == PG_SQL_ASCII ||
377                   collate_encoding == -1 ||
378 #ifdef WIN32
379                   encoding == PG_UTF8 ||
380 #endif
381                   (encoding == PG_SQL_ASCII && superuser())))
382                 ereport(ERROR,
383                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
384                                  errmsg("encoding %s does not match locale %s",
385                                                 pg_encoding_to_char(encoding),
386                                                 dbcollate),
387                          errdetail("The chosen LC_COLLATE setting requires encoding %s.",
388                                            pg_encoding_to_char(collate_encoding))));
389
390         /*
391          * Check that the new encoding and locale settings match the source
392          * database.  We insist on this because we simply copy the source data ---
393          * any non-ASCII data would be wrongly encoded, and any indexes sorted
394          * according to the source locale would be wrong.
395          *
396          * However, we assume that template0 doesn't contain any non-ASCII data
397          * nor any indexes that depend on collation or ctype, so template0 can be
398          * used as template for creating a database with any encoding or locale.
399          */
400         if (strcmp(dbtemplate, "template0") != 0)
401         {
402                 if (encoding != src_encoding)
403                         ereport(ERROR,
404                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
405                                          errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
406                                                         pg_encoding_to_char(encoding),
407                                                         pg_encoding_to_char(src_encoding)),
408                                          errhint("Use the same encoding as in the template database, or use template0 as template.")));
409
410                 if (strcmp(dbcollate, src_collate) != 0)
411                         ereport(ERROR,
412                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
413                                          errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
414                                                         dbcollate, src_collate),
415                                          errhint("Use the same collation as in the template database, or use template0 as template.")));
416
417                 if (strcmp(dbctype, src_ctype) != 0)
418                         ereport(ERROR,
419                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
420                                          errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
421                                                         dbctype, src_ctype),
422                                          errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
423         }
424
425         /* Resolve default tablespace for new database */
426         if (dtablespacename && dtablespacename->arg)
427         {
428                 char       *tablespacename;
429                 AclResult       aclresult;
430
431                 tablespacename = strVal(dtablespacename->arg);
432                 dst_deftablespace = get_tablespace_oid(tablespacename);
433                 if (!OidIsValid(dst_deftablespace))
434                         ereport(ERROR,
435                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
436                                          errmsg("tablespace \"%s\" does not exist",
437                                                         tablespacename)));
438                 /* check permissions */
439                 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
440                                                                                    ACL_CREATE);
441                 if (aclresult != ACLCHECK_OK)
442                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
443                                                    tablespacename);
444
445                 /* pg_global must never be the default tablespace */
446                 if (dst_deftablespace == GLOBALTABLESPACE_OID)
447                         ereport(ERROR,
448                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
449                                   errmsg("pg_global cannot be used as default tablespace")));
450
451                 /*
452                  * If we are trying to change the default tablespace of the template,
453                  * we require that the template not have any files in the new default
454                  * tablespace.  This is necessary because otherwise the copied
455                  * database would contain pg_class rows that refer to its default
456                  * tablespace both explicitly (by OID) and implicitly (as zero), which
457                  * would cause problems.  For example another CREATE DATABASE using
458                  * the copied database as template, and trying to change its default
459                  * tablespace again, would yield outright incorrect results (it would
460                  * improperly move tables to the new default tablespace that should
461                  * stay in the same tablespace).
462                  */
463                 if (dst_deftablespace != src_deftablespace)
464                 {
465                         char       *srcpath;
466                         struct stat st;
467
468                         srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
469
470                         if (stat(srcpath, &st) == 0 &&
471                                 S_ISDIR(st.st_mode) &&
472                                 !directory_is_empty(srcpath))
473                                 ereport(ERROR,
474                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
475                                                  errmsg("cannot assign new default tablespace \"%s\"",
476                                                                 tablespacename),
477                                                  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
478                                                                    dbtemplate)));
479                         pfree(srcpath);
480                 }
481         }
482         else
483         {
484                 /* Use template database's default tablespace */
485                 dst_deftablespace = src_deftablespace;
486                 /* Note there is no additional permission check in this path */
487         }
488
489         /*
490          * Check for db name conflict.  This is just to give a more friendly error
491          * message than "unique index violation".  There's a race condition but
492          * we're willing to accept the less friendly message in that case.
493          */
494         if (OidIsValid(get_database_oid(dbname)))
495                 ereport(ERROR,
496                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
497                                  errmsg("database \"%s\" already exists", dbname)));
498
499         /*
500          * The source DB can't have any active backends, except this one
501          * (exception is to allow CREATE DB while connected to template1).
502          * Otherwise we might copy inconsistent data.
503          *
504          * This should be last among the basic error checks, because it involves
505          * potential waiting; we may as well throw an error first if we're gonna
506          * throw one.
507          */
508         if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
509                 ereport(ERROR,
510                                 (errcode(ERRCODE_OBJECT_IN_USE),
511                         errmsg("source database \"%s\" is being accessed by other users",
512                                    dbtemplate),
513                                  errdetail_busy_db(notherbackends, npreparedxacts)));
514
515         /*
516          * Select an OID for the new database, checking that it doesn't have a
517          * filename conflict with anything already existing in the tablespace
518          * directories.
519          */
520         pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
521
522         do
523         {
524                 dboid = GetNewOid(pg_database_rel);
525         } while (check_db_file_conflict(dboid));
526
527         /*
528          * Insert a new tuple into pg_database.  This establishes our ownership of
529          * the new database name (anyone else trying to insert the same name will
530          * block on the unique index, and fail after we commit).
531          */
532
533         /* Form tuple */
534         MemSet(new_record, 0, sizeof(new_record));
535         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
536
537         new_record[Anum_pg_database_datname - 1] =
538                 DirectFunctionCall1(namein, CStringGetDatum(dbname));
539         new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
540         new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
541         new_record[Anum_pg_database_datcollate - 1] =
542                 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
543         new_record[Anum_pg_database_datctype - 1] =
544                 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
545         new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
546         new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
547         new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
548         new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
549         new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
550         new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
551
552         /*
553          * We deliberately set datacl to default (NULL), rather than copying it
554          * from the template database.  Copying it would be a bad idea when the
555          * owner is not the same as the template's owner.
556          */
557         new_record_nulls[Anum_pg_database_datacl - 1] = true;
558
559         tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
560                                                         new_record, new_record_nulls);
561
562         HeapTupleSetOid(tuple, dboid);
563
564         simple_heap_insert(pg_database_rel, tuple);
565
566         /* Update indexes */
567         CatalogUpdateIndexes(pg_database_rel, tuple);
568
569         /*
570          * Now generate additional catalog entries associated with the new DB
571          */
572
573         /* Register owner dependency */
574         recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
575
576         /* Create pg_shdepend entries for objects within database */
577         copyTemplateDependencies(src_dboid, dboid);
578
579         /*
580          * Force a checkpoint before starting the copy. This will force dirty
581          * buffers out to disk, to ensure source database is up-to-date on disk
582          * for the copy. FlushDatabaseBuffers() would suffice for that, but we
583          * also want to process any pending unlink requests. Otherwise, if a
584          * checkpoint happened while we're copying files, a file might be deleted
585          * just when we're about to copy it, causing the lstat() call in copydir()
586          * to fail with ENOENT.
587          */
588         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
589
590         /*
591          * Once we start copying subdirectories, we need to be able to clean 'em
592          * up if we fail.  Use an ENSURE block to make sure this happens.  (This
593          * is not a 100% solution, because of the possibility of failure during
594          * transaction commit after we leave this routine, but it should handle
595          * most scenarios.)
596          */
597         fparms.src_dboid = src_dboid;
598         fparms.dest_dboid = dboid;
599         PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
600                                                         PointerGetDatum(&fparms));
601         {
602                 /*
603                  * Iterate through all tablespaces of the template database, and copy
604                  * each one to the new database.
605                  */
606                 rel = heap_open(TableSpaceRelationId, AccessShareLock);
607                 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
608                 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
609                 {
610                         Oid                     srctablespace = HeapTupleGetOid(tuple);
611                         Oid                     dsttablespace;
612                         char       *srcpath;
613                         char       *dstpath;
614                         struct stat st;
615
616                         /* No need to copy global tablespace */
617                         if (srctablespace == GLOBALTABLESPACE_OID)
618                                 continue;
619
620                         srcpath = GetDatabasePath(src_dboid, srctablespace);
621
622                         if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
623                                 directory_is_empty(srcpath))
624                         {
625                                 /* Assume we can ignore it */
626                                 pfree(srcpath);
627                                 continue;
628                         }
629
630                         if (srctablespace == src_deftablespace)
631                                 dsttablespace = dst_deftablespace;
632                         else
633                                 dsttablespace = srctablespace;
634
635                         dstpath = GetDatabasePath(dboid, dsttablespace);
636
637                         /*
638                          * Copy this subdirectory to the new location
639                          *
640                          * We don't need to copy subdirectories
641                          */
642                         copydir(srcpath, dstpath, false);
643
644                         /* Record the filesystem change in XLOG */
645                         {
646                                 xl_dbase_create_rec xlrec;
647                                 XLogRecData rdata[1];
648
649                                 xlrec.db_id = dboid;
650                                 xlrec.tablespace_id = dsttablespace;
651                                 xlrec.src_db_id = src_dboid;
652                                 xlrec.src_tablespace_id = srctablespace;
653
654                                 rdata[0].data = (char *) &xlrec;
655                                 rdata[0].len = sizeof(xl_dbase_create_rec);
656                                 rdata[0].buffer = InvalidBuffer;
657                                 rdata[0].next = NULL;
658
659                                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
660                         }
661                 }
662                 heap_endscan(scan);
663                 heap_close(rel, AccessShareLock);
664
665                 /*
666                  * We force a checkpoint before committing.  This effectively means
667                  * that committed XLOG_DBASE_CREATE operations will never need to be
668                  * replayed (at least not in ordinary crash recovery; we still have to
669                  * make the XLOG entry for the benefit of PITR operations). This
670                  * avoids two nasty scenarios:
671                  *
672                  * #1: When PITR is off, we don't XLOG the contents of newly created
673                  * indexes; therefore the drop-and-recreate-whole-directory behavior
674                  * of DBASE_CREATE replay would lose such indexes.
675                  *
676                  * #2: Since we have to recopy the source database during DBASE_CREATE
677                  * replay, we run the risk of copying changes in it that were
678                  * committed after the original CREATE DATABASE command but before the
679                  * system crash that led to the replay.  This is at least unexpected
680                  * and at worst could lead to inconsistencies, eg duplicate table
681                  * names.
682                  *
683                  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
684                  *
685                  * In PITR replay, the first of these isn't an issue, and the second
686                  * is only a risk if the CREATE DATABASE and subsequent template
687                  * database change both occur while a base backup is being taken.
688                  * There doesn't seem to be much we can do about that except document
689                  * it as a limitation.
690                  *
691                  * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
692                  * we can avoid this.
693                  */
694                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
695
696                 /*
697                  * Close pg_database, but keep lock till commit.
698                  */
699                 heap_close(pg_database_rel, NoLock);
700
701                 /*
702                  * Force synchronous commit, thus minimizing the window between
703                  * creation of the database files and commital of the transaction. If
704                  * we crash before committing, we'll have a DB that's taking up disk
705                  * space but is not in pg_database, which is not good.
706                  */
707                 ForceSyncCommit();
708         }
709         PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
710                                                                 PointerGetDatum(&fparms));
711 }
712
713 /* Error cleanup callback for createdb */
714 static void
715 createdb_failure_callback(int code, Datum arg)
716 {
717         createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
718
719         /*
720          * Release lock on source database before doing recursive remove. This is
721          * not essential but it seems desirable to release the lock as soon as
722          * possible.
723          */
724         UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
725
726         /* Throw away any successfully copied subdirectories */
727         remove_dbtablespaces(fparms->dest_dboid);
728 }
729
730
731 /*
732  * DROP DATABASE
733  */
734 void
735 dropdb(const char *dbname, bool missing_ok)
736 {
737         Oid                     db_id;
738         bool            db_istemplate;
739         Relation        pgdbrel;
740         HeapTuple       tup;
741         int                     notherbackends;
742         int                     npreparedxacts;
743
744         /*
745          * Look up the target database's OID, and get exclusive lock on it. We
746          * need this to ensure that no new backend starts up in the target
747          * database while we are deleting it (see postinit.c), and that no one is
748          * using it as a CREATE DATABASE template or trying to delete it for
749          * themselves.
750          */
751         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
752
753         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
754                                          &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL))
755         {
756                 if (!missing_ok)
757                 {
758                         ereport(ERROR,
759                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
760                                          errmsg("database \"%s\" does not exist", dbname)));
761                 }
762                 else
763                 {
764                         /* Close pg_database, release the lock, since we changed nothing */
765                         heap_close(pgdbrel, RowExclusiveLock);
766                         ereport(NOTICE,
767                                         (errmsg("database \"%s\" does not exist, skipping",
768                                                         dbname)));
769                         return;
770                 }
771         }
772
773         /*
774          * Permission checks
775          */
776         if (!pg_database_ownercheck(db_id, GetUserId()))
777                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
778                                            dbname);
779
780         /*
781          * Disallow dropping a DB that is marked istemplate.  This is just to
782          * prevent people from accidentally dropping template0 or template1; they
783          * can do so if they're really determined ...
784          */
785         if (db_istemplate)
786                 ereport(ERROR,
787                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
788                                  errmsg("cannot drop a template database")));
789
790         /* Obviously can't drop my own database */
791         if (db_id == MyDatabaseId)
792                 ereport(ERROR,
793                                 (errcode(ERRCODE_OBJECT_IN_USE),
794                                  errmsg("cannot drop the currently open database")));
795
796         /*
797          * Check for other backends in the target database.  (Because we hold the
798          * database lock, no new ones can start after this.)
799          *
800          * As in CREATE DATABASE, check this after other error conditions.
801          */
802         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
803                 ereport(ERROR,
804                                 (errcode(ERRCODE_OBJECT_IN_USE),
805                                  errmsg("database \"%s\" is being accessed by other users",
806                                                 dbname),
807                                  errdetail_busy_db(notherbackends, npreparedxacts)));
808
809         /*
810          * Remove the database's tuple from pg_database.
811          */
812         tup = SearchSysCache(DATABASEOID,
813                                                  ObjectIdGetDatum(db_id),
814                                                  0, 0, 0);
815         if (!HeapTupleIsValid(tup))
816                 elog(ERROR, "cache lookup failed for database %u", db_id);
817
818         simple_heap_delete(pgdbrel, &tup->t_self);
819
820         ReleaseSysCache(tup);
821
822         /*
823          * Delete any comments associated with the database.
824          */
825         DeleteSharedComments(db_id, DatabaseRelationId);
826
827         /*
828          * Remove settings associated with this database
829          */
830         DropSetting(db_id, InvalidOid);
831
832         /*
833          * Remove shared dependency references for the database.
834          */
835         dropDatabaseDependencies(db_id);
836
837         /*
838          * Drop pages for this database that are in the shared buffer cache. This
839          * is important to ensure that no remaining backend tries to write out a
840          * dirty buffer to the dead database later...
841          */
842         DropDatabaseBuffers(db_id);
843
844         /*
845          * Tell the stats collector to forget it immediately, too.
846          */
847         pgstat_drop_database(db_id);
848
849         /*
850          * Tell bgwriter to forget any pending fsync and unlink requests for files
851          * in the database; else the fsyncs will fail at next checkpoint, or
852          * worse, it will delete files that belong to a newly created database
853          * with the same OID.
854          */
855         ForgetDatabaseFsyncRequests(db_id);
856
857         /*
858          * Force a checkpoint to make sure the bgwriter has received the message
859          * sent by ForgetDatabaseFsyncRequests. On Windows, this also ensures that
860          * the bgwriter doesn't hold any open files, which would cause rmdir() to
861          * fail.
862          */
863         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
864
865         /*
866          * Remove all tablespace subdirs belonging to the database.
867          */
868         remove_dbtablespaces(db_id);
869
870         /*
871          * Close pg_database, but keep lock till commit.
872          */
873         heap_close(pgdbrel, NoLock);
874
875         /*
876          * Force synchronous commit, thus minimizing the window between removal
877          * of the database files and commital of the transaction. If we crash
878          * before committing, we'll have a DB that's gone on disk but still there
879          * according to pg_database, which is not good.
880          */
881         ForceSyncCommit();
882 }
883
884
885 /*
886  * Rename database
887  */
888 void
889 RenameDatabase(const char *oldname, const char *newname)
890 {
891         Oid                     db_id;
892         HeapTuple       newtup;
893         Relation        rel;
894         int                     notherbackends;
895         int                     npreparedxacts;
896
897         /*
898          * Look up the target database's OID, and get exclusive lock on it. We
899          * need this for the same reasons as DROP DATABASE.
900          */
901         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
902
903         if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
904                                          NULL, NULL, NULL, NULL, NULL, NULL, NULL))
905                 ereport(ERROR,
906                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
907                                  errmsg("database \"%s\" does not exist", oldname)));
908
909         /* must be owner */
910         if (!pg_database_ownercheck(db_id, GetUserId()))
911                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
912                                            oldname);
913
914         /* must have createdb rights */
915         if (!have_createdb_privilege())
916                 ereport(ERROR,
917                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
918                                  errmsg("permission denied to rename database")));
919
920         /*
921          * Make sure the new name doesn't exist.  See notes for same error in
922          * CREATE DATABASE.
923          */
924         if (OidIsValid(get_database_oid(newname)))
925                 ereport(ERROR,
926                                 (errcode(ERRCODE_DUPLICATE_DATABASE),
927                                  errmsg("database \"%s\" already exists", newname)));
928
929         /*
930          * XXX Client applications probably store the current database somewhere,
931          * so renaming it could cause confusion.  On the other hand, there may not
932          * be an actual problem besides a little confusion, so think about this
933          * and decide.
934          */
935         if (db_id == MyDatabaseId)
936                 ereport(ERROR,
937                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
938                                  errmsg("current database cannot be renamed")));
939
940         /*
941          * Make sure the database does not have active sessions.  This is the same
942          * concern as above, but applied to other sessions.
943          *
944          * As in CREATE DATABASE, check this after other error conditions.
945          */
946         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
947                 ereport(ERROR,
948                                 (errcode(ERRCODE_OBJECT_IN_USE),
949                                  errmsg("database \"%s\" is being accessed by other users",
950                                                 oldname),
951                                  errdetail_busy_db(notherbackends, npreparedxacts)));
952
953         /* rename */
954         newtup = SearchSysCacheCopy(DATABASEOID,
955                                                                 ObjectIdGetDatum(db_id),
956                                                                 0, 0, 0);
957         if (!HeapTupleIsValid(newtup))
958                 elog(ERROR, "cache lookup failed for database %u", db_id);
959         namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
960         simple_heap_update(rel, &newtup->t_self, newtup);
961         CatalogUpdateIndexes(rel, newtup);
962
963         /*
964          * Close pg_database, but keep lock till commit.
965          */
966         heap_close(rel, NoLock);
967 }
968
969
970 /*
971  * ALTER DATABASE SET TABLESPACE
972  */
973 static void
974 movedb(const char *dbname, const char *tblspcname)
975 {
976         Oid                     db_id;
977         Relation        pgdbrel;
978         int                     notherbackends;
979         int                     npreparedxacts;
980         HeapTuple       oldtuple,
981                                 newtuple;
982         Oid                     src_tblspcoid,
983                                 dst_tblspcoid;
984         Datum           new_record[Natts_pg_database];
985         bool            new_record_nulls[Natts_pg_database];
986         bool            new_record_repl[Natts_pg_database];
987         ScanKeyData scankey;
988         SysScanDesc sysscan;
989         AclResult       aclresult;
990         char       *src_dbpath;
991         char       *dst_dbpath;
992         DIR                *dstdir;
993         struct dirent *xlde;
994         movedb_failure_params fparms;
995
996         /*
997          * Look up the target database's OID, and get exclusive lock on it. We
998          * need this to ensure that no new backend starts up in the database while
999          * we are moving it, and that no one is using it as a CREATE DATABASE
1000          * template or trying to delete it.
1001          */
1002         pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
1003
1004         if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1005                                          NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1006                 ereport(ERROR,
1007                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1008                                  errmsg("database \"%s\" does not exist", dbname)));
1009
1010         /*
1011          * We actually need a session lock, so that the lock will persist across
1012          * the commit/restart below.  (We could almost get away with letting the
1013          * lock be released at commit, except that someone could try to move
1014          * relations of the DB back into the old directory while we rmtree() it.)
1015          */
1016         LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1017                                                            AccessExclusiveLock);
1018
1019         /*
1020          * Permission checks
1021          */
1022         if (!pg_database_ownercheck(db_id, GetUserId()))
1023                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1024                                            dbname);
1025
1026         /*
1027          * Obviously can't move the tables of my own database
1028          */
1029         if (db_id == MyDatabaseId)
1030                 ereport(ERROR,
1031                                 (errcode(ERRCODE_OBJECT_IN_USE),
1032                                  errmsg("cannot change the tablespace of the currently open database")));
1033
1034         /*
1035          * Get tablespace's oid
1036          */
1037         dst_tblspcoid = get_tablespace_oid(tblspcname);
1038         if (dst_tblspcoid == InvalidOid)
1039                 ereport(ERROR,
1040                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1041                                  errmsg("tablespace \"%s\" does not exist", tblspcname)));
1042
1043         /*
1044          * Permission checks
1045          */
1046         aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1047                                                                            ACL_CREATE);
1048         if (aclresult != ACLCHECK_OK)
1049                 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
1050                                            tblspcname);
1051
1052         /*
1053          * pg_global must never be the default tablespace
1054          */
1055         if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1056                 ereport(ERROR,
1057                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1058                                  errmsg("pg_global cannot be used as default tablespace")));
1059
1060         /*
1061          * No-op if same tablespace
1062          */
1063         if (src_tblspcoid == dst_tblspcoid)
1064         {
1065                 heap_close(pgdbrel, NoLock);
1066                 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1067                                                                          AccessExclusiveLock);
1068                 return;
1069         }
1070
1071         /*
1072          * Check for other backends in the target database.  (Because we hold the
1073          * database lock, no new ones can start after this.)
1074          *
1075          * As in CREATE DATABASE, check this after other error conditions.
1076          */
1077         if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1078                 ereport(ERROR,
1079                                 (errcode(ERRCODE_OBJECT_IN_USE),
1080                                  errmsg("database \"%s\" is being accessed by other users",
1081                                                 dbname),
1082                                  errdetail_busy_db(notherbackends, npreparedxacts)));
1083
1084         /*
1085          * Get old and new database paths
1086          */
1087         src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1088         dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1089
1090         /*
1091          * Force a checkpoint before proceeding. This will force dirty buffers out
1092          * to disk, to ensure source database is up-to-date on disk for the copy.
1093          * FlushDatabaseBuffers() would suffice for that, but we also want to
1094          * process any pending unlink requests. Otherwise, the check for existing
1095          * files in the target directory might fail unnecessarily, not to mention
1096          * that the copy might fail due to source files getting deleted under it.
1097          * On Windows, this also ensures that the bgwriter doesn't hold any open
1098          * files, which would cause rmdir() to fail.
1099          */
1100         RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1101
1102         /*
1103          * Check for existence of files in the target directory, i.e., objects of
1104          * this database that are already in the target tablespace.  We can't
1105          * allow the move in such a case, because we would need to change those
1106          * relations' pg_class.reltablespace entries to zero, and we don't have
1107          * access to the DB's pg_class to do so.
1108          */
1109         dstdir = AllocateDir(dst_dbpath);
1110         if (dstdir != NULL)
1111         {
1112                 while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1113                 {
1114                         if (strcmp(xlde->d_name, ".") == 0 ||
1115                                 strcmp(xlde->d_name, "..") == 0)
1116                                 continue;
1117
1118                         ereport(ERROR,
1119                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1120                                          errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1121                                                         dbname, tblspcname),
1122                                          errhint("You must move them back to the database's default tablespace before using this command.")));
1123                 }
1124
1125                 FreeDir(dstdir);
1126
1127                 /*
1128                  * The directory exists but is empty. We must remove it before using
1129                  * the copydir function.
1130                  */
1131                 if (rmdir(dst_dbpath) != 0)
1132                         elog(ERROR, "could not remove directory \"%s\": %m",
1133                                  dst_dbpath);
1134         }
1135
1136         /*
1137          * Use an ENSURE block to make sure we remove the debris if the copy fails
1138          * (eg, due to out-of-disk-space).      This is not a 100% solution, because
1139          * of the possibility of failure during transaction commit, but it should
1140          * handle most scenarios.
1141          */
1142         fparms.dest_dboid = db_id;
1143         fparms.dest_tsoid = dst_tblspcoid;
1144         PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1145                                                         PointerGetDatum(&fparms));
1146         {
1147                 /*
1148                  * Copy files from the old tablespace to the new one
1149                  */
1150                 copydir(src_dbpath, dst_dbpath, false);
1151
1152                 /*
1153                  * Record the filesystem change in XLOG
1154                  */
1155                 {
1156                         xl_dbase_create_rec xlrec;
1157                         XLogRecData rdata[1];
1158
1159                         xlrec.db_id = db_id;
1160                         xlrec.tablespace_id = dst_tblspcoid;
1161                         xlrec.src_db_id = db_id;
1162                         xlrec.src_tablespace_id = src_tblspcoid;
1163
1164                         rdata[0].data = (char *) &xlrec;
1165                         rdata[0].len = sizeof(xl_dbase_create_rec);
1166                         rdata[0].buffer = InvalidBuffer;
1167                         rdata[0].next = NULL;
1168
1169                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
1170                 }
1171
1172                 /*
1173                  * Update the database's pg_database tuple
1174                  */
1175                 ScanKeyInit(&scankey,
1176                                         Anum_pg_database_datname,
1177                                         BTEqualStrategyNumber, F_NAMEEQ,
1178                                         NameGetDatum(dbname));
1179                 sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1180                                                                          SnapshotNow, 1, &scankey);
1181                 oldtuple = systable_getnext(sysscan);
1182                 if (!HeapTupleIsValid(oldtuple))                /* shouldn't happen... */
1183                         ereport(ERROR,
1184                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
1185                                          errmsg("database \"%s\" does not exist", dbname)));
1186
1187                 MemSet(new_record, 0, sizeof(new_record));
1188                 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1189                 MemSet(new_record_repl, false, sizeof(new_record_repl));
1190
1191                 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1192                 new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1193
1194                 newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1195                                                                          new_record,
1196                                                                          new_record_nulls, new_record_repl);
1197                 simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple);
1198
1199                 /* Update indexes */
1200                 CatalogUpdateIndexes(pgdbrel, newtuple);
1201
1202                 systable_endscan(sysscan);
1203
1204                 /*
1205                  * Force another checkpoint here.  As in CREATE DATABASE, this is to
1206                  * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1207                  * operation, which would cause us to lose any unlogged operations
1208                  * done in the new DB tablespace before the next checkpoint.
1209                  */
1210                 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1211
1212                 /*
1213                  * Force synchronous commit, thus minimizing the window between
1214                  * copying the database files and commital of the transaction. If we
1215                  * crash before committing, we'll leave an orphaned set of files on
1216                  * disk, which is not fatal but not good either.
1217                  */
1218                 ForceSyncCommit();
1219
1220                 /*
1221                  * Close pg_database, but keep lock till commit.
1222                  */
1223                 heap_close(pgdbrel, NoLock);
1224         }
1225         PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1226                                                                 PointerGetDatum(&fparms));
1227
1228         /*
1229          * Commit the transaction so that the pg_database update is committed. If
1230          * we crash while removing files, the database won't be corrupt, we'll
1231          * just leave some orphaned files in the old directory.
1232          *
1233          * (This is OK because we know we aren't inside a transaction block.)
1234          *
1235          * XXX would it be safe/better to do this inside the ensure block?      Not
1236          * convinced it's a good idea; consider elog just after the transaction
1237          * really commits.
1238          */
1239         PopActiveSnapshot();
1240         CommitTransactionCommand();
1241
1242         /* Start new transaction for the remaining work; don't need a snapshot */
1243         StartTransactionCommand();
1244
1245         /*
1246          * Remove files from the old tablespace
1247          */
1248         if (!rmtree(src_dbpath, true))
1249                 ereport(WARNING,
1250                                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1251                                                 src_dbpath)));
1252
1253         /*
1254          * Record the filesystem change in XLOG
1255          */
1256         {
1257                 xl_dbase_drop_rec xlrec;
1258                 XLogRecData rdata[1];
1259
1260                 xlrec.db_id = db_id;
1261                 xlrec.tablespace_id = src_tblspcoid;
1262
1263                 rdata[0].data = (char *) &xlrec;
1264                 rdata[0].len = sizeof(xl_dbase_drop_rec);
1265                 rdata[0].buffer = InvalidBuffer;
1266                 rdata[0].next = NULL;
1267
1268                 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1269         }
1270
1271         /* Now it's safe to release the database lock */
1272         UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1273                                                                  AccessExclusiveLock);
1274 }
1275
1276 /* Error cleanup callback for movedb */
1277 static void
1278 movedb_failure_callback(int code, Datum arg)
1279 {
1280         movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1281         char       *dstpath;
1282
1283         /* Get rid of anything we managed to copy to the target directory */
1284         dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1285
1286         (void) rmtree(dstpath, true);
1287 }
1288
1289
1290 /*
1291  * ALTER DATABASE name ...
1292  */
1293 void
1294 AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
1295 {
1296         Relation        rel;
1297         HeapTuple       tuple,
1298                                 newtuple;
1299         ScanKeyData scankey;
1300         SysScanDesc scan;
1301         ListCell   *option;
1302         int                     connlimit = -1;
1303         DefElem    *dconnlimit = NULL;
1304         DefElem    *dtablespace = NULL;
1305         Datum           new_record[Natts_pg_database];
1306         bool            new_record_nulls[Natts_pg_database];
1307         bool            new_record_repl[Natts_pg_database];
1308
1309         /* Extract options from the statement node tree */
1310         foreach(option, stmt->options)
1311         {
1312                 DefElem    *defel = (DefElem *) lfirst(option);
1313
1314                 if (strcmp(defel->defname, "connectionlimit") == 0)
1315                 {
1316                         if (dconnlimit)
1317                                 ereport(ERROR,
1318                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1319                                                  errmsg("conflicting or redundant options")));
1320                         dconnlimit = defel;
1321                 }
1322                 else if (strcmp(defel->defname, "tablespace") == 0)
1323                 {
1324                         if (dtablespace)
1325                                 ereport(ERROR,
1326                                                 (errcode(ERRCODE_SYNTAX_ERROR),
1327                                                  errmsg("conflicting or redundant options")));
1328                         dtablespace = defel;
1329                 }
1330                 else
1331                         elog(ERROR, "option \"%s\" not recognized",
1332                                  defel->defname);
1333         }
1334
1335         if (dtablespace)
1336         {
1337                 /* currently, can't be specified along with any other options */
1338                 Assert(!dconnlimit);
1339                 /* this case isn't allowed within a transaction block */
1340                 PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1341                 movedb(stmt->dbname, strVal(dtablespace->arg));
1342                 return;
1343         }
1344
1345         if (dconnlimit)
1346         {
1347                 connlimit = intVal(dconnlimit->arg);
1348                 if (connlimit < -1)
1349                         ereport(ERROR,
1350                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1351                                          errmsg("invalid connection limit: %d", connlimit)));
1352         }
1353
1354         /*
1355          * Get the old tuple.  We don't need a lock on the database per se,
1356          * because we're not going to do anything that would mess up incoming
1357          * connections.
1358          */
1359         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1360         ScanKeyInit(&scankey,
1361                                 Anum_pg_database_datname,
1362                                 BTEqualStrategyNumber, F_NAMEEQ,
1363                                 NameGetDatum(stmt->dbname));
1364         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1365                                                           SnapshotNow, 1, &scankey);
1366         tuple = systable_getnext(scan);
1367         if (!HeapTupleIsValid(tuple))
1368                 ereport(ERROR,
1369                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1370                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
1371
1372         if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1373                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1374                                            stmt->dbname);
1375
1376         /*
1377          * Build an updated tuple, perusing the information just obtained
1378          */
1379         MemSet(new_record, 0, sizeof(new_record));
1380         MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1381         MemSet(new_record_repl, false, sizeof(new_record_repl));
1382
1383         if (dconnlimit)
1384         {
1385                 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
1386                 new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1387         }
1388
1389         newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1390                                                                  new_record_nulls, new_record_repl);
1391         simple_heap_update(rel, &tuple->t_self, newtuple);
1392
1393         /* Update indexes */
1394         CatalogUpdateIndexes(rel, newtuple);
1395
1396         systable_endscan(scan);
1397
1398         /* Close pg_database, but keep lock till commit */
1399         heap_close(rel, NoLock);
1400 }
1401
1402
1403 /*
1404  * ALTER DATABASE name SET ...
1405  */
1406 void
1407 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1408 {
1409         Oid             datid = get_database_oid(stmt->dbname);
1410
1411         if (!OidIsValid(datid))
1412                 ereport(ERROR,
1413                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1414                                  errmsg("database \"%s\" does not exist", stmt->dbname)));
1415   
1416         /*
1417          * Obtain a lock on the database and make sure it didn't go away in the
1418          * meantime.
1419          */
1420         shdepLockAndCheckObject(DatabaseRelationId, datid);
1421
1422         if (!pg_database_ownercheck(datid, GetUserId()))
1423                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1424                                            stmt->dbname);
1425
1426         AlterSetting(datid, InvalidOid, stmt->setstmt);
1427   
1428         UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1429 }
1430
1431
1432 /*
1433  * ALTER DATABASE name OWNER TO newowner
1434  */
1435 void
1436 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1437 {
1438         HeapTuple       tuple;
1439         Relation        rel;
1440         ScanKeyData scankey;
1441         SysScanDesc scan;
1442         Form_pg_database datForm;
1443
1444         /*
1445          * Get the old tuple.  We don't need a lock on the database per se,
1446          * because we're not going to do anything that would mess up incoming
1447          * connections.
1448          */
1449         rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1450         ScanKeyInit(&scankey,
1451                                 Anum_pg_database_datname,
1452                                 BTEqualStrategyNumber, F_NAMEEQ,
1453                                 NameGetDatum(dbname));
1454         scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1455                                                           SnapshotNow, 1, &scankey);
1456         tuple = systable_getnext(scan);
1457         if (!HeapTupleIsValid(tuple))
1458                 ereport(ERROR,
1459                                 (errcode(ERRCODE_UNDEFINED_DATABASE),
1460                                  errmsg("database \"%s\" does not exist", dbname)));
1461
1462         datForm = (Form_pg_database) GETSTRUCT(tuple);
1463
1464         /*
1465          * If the new owner is the same as the existing owner, consider the
1466          * command to have succeeded.  This is to be consistent with other
1467          * objects.
1468          */
1469         if (datForm->datdba != newOwnerId)
1470         {
1471                 Datum           repl_val[Natts_pg_database];
1472                 bool            repl_null[Natts_pg_database];
1473                 bool            repl_repl[Natts_pg_database];
1474                 Acl                *newAcl;
1475                 Datum           aclDatum;
1476                 bool            isNull;
1477                 HeapTuple       newtuple;
1478
1479                 /* Otherwise, must be owner of the existing object */
1480                 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1481                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1482                                                    dbname);
1483
1484                 /* Must be able to become new owner */
1485                 check_is_member_of_role(GetUserId(), newOwnerId);
1486
1487                 /*
1488                  * must have createdb rights
1489                  *
1490                  * NOTE: This is different from other alter-owner checks in that the
1491                  * current user is checked for createdb privileges instead of the
1492                  * destination owner.  This is consistent with the CREATE case for
1493                  * databases.  Because superusers will always have this right, we need
1494                  * no special case for them.
1495                  */
1496                 if (!have_createdb_privilege())
1497                         ereport(ERROR,
1498                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1499                                    errmsg("permission denied to change owner of database")));
1500
1501                 memset(repl_null, false, sizeof(repl_null));
1502                 memset(repl_repl, false, sizeof(repl_repl));
1503
1504                 repl_repl[Anum_pg_database_datdba - 1] = true;
1505                 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1506
1507                 /*
1508                  * Determine the modified ACL for the new owner.  This is only
1509                  * necessary when the ACL is non-null.
1510                  */
1511                 aclDatum = heap_getattr(tuple,
1512                                                                 Anum_pg_database_datacl,
1513                                                                 RelationGetDescr(rel),
1514                                                                 &isNull);
1515                 if (!isNull)
1516                 {
1517                         newAcl = aclnewowner(DatumGetAclP(aclDatum),
1518                                                                  datForm->datdba, newOwnerId);
1519                         repl_repl[Anum_pg_database_datacl - 1] = true;
1520                         repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1521                 }
1522
1523                 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1524                 simple_heap_update(rel, &newtuple->t_self, newtuple);
1525                 CatalogUpdateIndexes(rel, newtuple);
1526
1527                 heap_freetuple(newtuple);
1528
1529                 /* Update owner dependency reference */
1530                 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1531                                                                 newOwnerId);
1532         }
1533
1534         systable_endscan(scan);
1535
1536         /* Close pg_database, but keep lock till commit */
1537         heap_close(rel, NoLock);
1538 }
1539
1540
1541 /*
1542  * Helper functions
1543  */
1544
1545 /*
1546  * Look up info about the database named "name".  If the database exists,
1547  * obtain the specified lock type on it, fill in any of the remaining
1548  * parameters that aren't NULL, and return TRUE.  If no such database,
1549  * return FALSE.
1550  */
1551 static bool
1552 get_db_info(const char *name, LOCKMODE lockmode,
1553                         Oid *dbIdP, Oid *ownerIdP,
1554                         int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1555                         Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1556                         Oid *dbTablespace, char **dbCollate, char **dbCtype)
1557 {
1558         bool            result = false;
1559         Relation        relation;
1560
1561         AssertArg(name);
1562
1563         /* Caller may wish to grab a better lock on pg_database beforehand... */
1564         relation = heap_open(DatabaseRelationId, AccessShareLock);
1565
1566         /*
1567          * Loop covers the rare case where the database is renamed before we can
1568          * lock it.  We try again just in case we can find a new one of the same
1569          * name.
1570          */
1571         for (;;)
1572         {
1573                 ScanKeyData scanKey;
1574                 SysScanDesc scan;
1575                 HeapTuple       tuple;
1576                 Oid                     dbOid;
1577
1578                 /*
1579                  * there's no syscache for database-indexed-by-name, so must do it the
1580                  * hard way
1581                  */
1582                 ScanKeyInit(&scanKey,
1583                                         Anum_pg_database_datname,
1584                                         BTEqualStrategyNumber, F_NAMEEQ,
1585                                         NameGetDatum(name));
1586
1587                 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1588                                                                   SnapshotNow, 1, &scanKey);
1589
1590                 tuple = systable_getnext(scan);
1591
1592                 if (!HeapTupleIsValid(tuple))
1593                 {
1594                         /* definitely no database of that name */
1595                         systable_endscan(scan);
1596                         break;
1597                 }
1598
1599                 dbOid = HeapTupleGetOid(tuple);
1600
1601                 systable_endscan(scan);
1602
1603                 /*
1604                  * Now that we have a database OID, we can try to lock the DB.
1605                  */
1606                 if (lockmode != NoLock)
1607                         LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1608
1609                 /*
1610                  * And now, re-fetch the tuple by OID.  If it's still there and still
1611                  * the same name, we win; else, drop the lock and loop back to try
1612                  * again.
1613                  */
1614                 tuple = SearchSysCache(DATABASEOID,
1615                                                            ObjectIdGetDatum(dbOid),
1616                                                            0, 0, 0);
1617                 if (HeapTupleIsValid(tuple))
1618                 {
1619                         Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1620
1621                         if (strcmp(name, NameStr(dbform->datname)) == 0)
1622                         {
1623                                 /* oid of the database */
1624                                 if (dbIdP)
1625                                         *dbIdP = dbOid;
1626                                 /* oid of the owner */
1627                                 if (ownerIdP)
1628                                         *ownerIdP = dbform->datdba;
1629                                 /* character encoding */
1630                                 if (encodingP)
1631                                         *encodingP = dbform->encoding;
1632                                 /* allowed as template? */
1633                                 if (dbIsTemplateP)
1634                                         *dbIsTemplateP = dbform->datistemplate;
1635                                 /* allowing connections? */
1636                                 if (dbAllowConnP)
1637                                         *dbAllowConnP = dbform->datallowconn;
1638                                 /* last system OID used in database */
1639                                 if (dbLastSysOidP)
1640                                         *dbLastSysOidP = dbform->datlastsysoid;
1641                                 /* limit of frozen XIDs */
1642                                 if (dbFrozenXidP)
1643                                         *dbFrozenXidP = dbform->datfrozenxid;
1644                                 /* default tablespace for this database */
1645                                 if (dbTablespace)
1646                                         *dbTablespace = dbform->dattablespace;
1647                                 /* default locale settings for this database */
1648                                 if (dbCollate)
1649                                         *dbCollate = pstrdup(NameStr(dbform->datcollate));
1650                                 if (dbCtype)
1651                                         *dbCtype = pstrdup(NameStr(dbform->datctype));
1652                                 ReleaseSysCache(tuple);
1653                                 result = true;
1654                                 break;
1655                         }
1656                         /* can only get here if it was just renamed */
1657                         ReleaseSysCache(tuple);
1658                 }
1659
1660                 if (lockmode != NoLock)
1661                         UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1662         }
1663
1664         heap_close(relation, AccessShareLock);
1665
1666         return result;
1667 }
1668
1669 /* Check if current user has createdb privileges */
1670 static bool
1671 have_createdb_privilege(void)
1672 {
1673         bool            result = false;
1674         HeapTuple       utup;
1675
1676         /* Superusers can always do everything */
1677         if (superuser())
1678                 return true;
1679
1680         utup = SearchSysCache(AUTHOID,
1681                                                   ObjectIdGetDatum(GetUserId()),
1682                                                   0, 0, 0);
1683         if (HeapTupleIsValid(utup))
1684         {
1685                 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1686                 ReleaseSysCache(utup);
1687         }
1688         return result;
1689 }
1690
1691 /*
1692  * Remove tablespace directories
1693  *
1694  * We don't know what tablespaces db_id is using, so iterate through all
1695  * tablespaces removing <tablespace>/db_id
1696  */
1697 static void
1698 remove_dbtablespaces(Oid db_id)
1699 {
1700         Relation        rel;
1701         HeapScanDesc scan;
1702         HeapTuple       tuple;
1703
1704         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1705         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1706         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1707         {
1708                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1709                 char       *dstpath;
1710                 struct stat st;
1711
1712                 /* Don't mess with the global tablespace */
1713                 if (dsttablespace == GLOBALTABLESPACE_OID)
1714                         continue;
1715
1716                 dstpath = GetDatabasePath(db_id, dsttablespace);
1717
1718                 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1719                 {
1720                         /* Assume we can ignore it */
1721                         pfree(dstpath);
1722                         continue;
1723                 }
1724
1725                 if (!rmtree(dstpath, true))
1726                         ereport(WARNING,
1727                                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
1728                                                         dstpath)));
1729
1730                 /* Record the filesystem change in XLOG */
1731                 {
1732                         xl_dbase_drop_rec xlrec;
1733                         XLogRecData rdata[1];
1734
1735                         xlrec.db_id = db_id;
1736                         xlrec.tablespace_id = dsttablespace;
1737
1738                         rdata[0].data = (char *) &xlrec;
1739                         rdata[0].len = sizeof(xl_dbase_drop_rec);
1740                         rdata[0].buffer = InvalidBuffer;
1741                         rdata[0].next = NULL;
1742
1743                         (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1744                 }
1745
1746                 pfree(dstpath);
1747         }
1748
1749         heap_endscan(scan);
1750         heap_close(rel, AccessShareLock);
1751 }
1752
1753 /*
1754  * Check for existing files that conflict with a proposed new DB OID;
1755  * return TRUE if there are any
1756  *
1757  * If there were a subdirectory in any tablespace matching the proposed new
1758  * OID, we'd get a create failure due to the duplicate name ... and then we'd
1759  * try to remove that already-existing subdirectory during the cleanup in
1760  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
1761  * instead we make this extra check before settling on the OID of the new
1762  * database.  This exactly parallels what GetNewRelFileNode() does for table
1763  * relfilenode values.
1764  */
1765 static bool
1766 check_db_file_conflict(Oid db_id)
1767 {
1768         bool            result = false;
1769         Relation        rel;
1770         HeapScanDesc scan;
1771         HeapTuple       tuple;
1772
1773         rel = heap_open(TableSpaceRelationId, AccessShareLock);
1774         scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1775         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1776         {
1777                 Oid                     dsttablespace = HeapTupleGetOid(tuple);
1778                 char       *dstpath;
1779                 struct stat st;
1780
1781                 /* Don't mess with the global tablespace */
1782                 if (dsttablespace == GLOBALTABLESPACE_OID)
1783                         continue;
1784
1785                 dstpath = GetDatabasePath(db_id, dsttablespace);
1786
1787                 if (lstat(dstpath, &st) == 0)
1788                 {
1789                         /* Found a conflicting file (or directory, whatever) */
1790                         pfree(dstpath);
1791                         result = true;
1792                         break;
1793                 }
1794
1795                 pfree(dstpath);
1796         }
1797
1798         heap_endscan(scan);
1799         heap_close(rel, AccessShareLock);
1800         return result;
1801 }
1802
1803 /*
1804  * Issue a suitable errdetail message for a busy database
1805  */
1806 static int
1807 errdetail_busy_db(int notherbackends, int npreparedxacts)
1808 {
1809         /*
1810          * We don't worry about singular versus plural here, since the English
1811          * rules for that don't translate very well.  But we can at least avoid
1812          * the case of zero items.
1813          */
1814         if (notherbackends > 0 && npreparedxacts > 0)
1815                 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1816                                   notherbackends, npreparedxacts);
1817         else if (notherbackends > 0)
1818                 errdetail("There are %d other session(s) using the database.",
1819                                   notherbackends);
1820         else
1821                 errdetail("There are %d prepared transaction(s) using the database.",
1822                                   npreparedxacts);
1823         return 0;                                       /* just to keep ereport macro happy */
1824 }
1825
1826 /*
1827  * get_database_oid - given a database name, look up the OID
1828  *
1829  * Returns InvalidOid if database name not found.
1830  */
1831 Oid
1832 get_database_oid(const char *dbname)
1833 {
1834         Relation        pg_database;
1835         ScanKeyData entry[1];
1836         SysScanDesc scan;
1837         HeapTuple       dbtuple;
1838         Oid                     oid;
1839
1840         /*
1841          * There's no syscache for pg_database indexed by name, so we must look
1842          * the hard way.
1843          */
1844         pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1845         ScanKeyInit(&entry[0],
1846                                 Anum_pg_database_datname,
1847                                 BTEqualStrategyNumber, F_NAMEEQ,
1848                                 CStringGetDatum(dbname));
1849         scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1850                                                           SnapshotNow, 1, entry);
1851
1852         dbtuple = systable_getnext(scan);
1853
1854         /* We assume that there can be at most one matching tuple */
1855         if (HeapTupleIsValid(dbtuple))
1856                 oid = HeapTupleGetOid(dbtuple);
1857         else
1858                 oid = InvalidOid;
1859
1860         systable_endscan(scan);
1861         heap_close(pg_database, AccessShareLock);
1862
1863         return oid;
1864 }
1865
1866
1867 /*
1868  * get_database_name - given a database OID, look up the name
1869  *
1870  * Returns a palloc'd string, or NULL if no such database.
1871  */
1872 char *
1873 get_database_name(Oid dbid)
1874 {
1875         HeapTuple       dbtuple;
1876         char       *result;
1877
1878         dbtuple = SearchSysCache(DATABASEOID,
1879                                                          ObjectIdGetDatum(dbid),
1880                                                          0, 0, 0);
1881         if (HeapTupleIsValid(dbtuple))
1882         {
1883                 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1884                 ReleaseSysCache(dbtuple);
1885         }
1886         else
1887                 result = NULL;
1888
1889         return result;
1890 }
1891
1892 /*
1893  * DATABASE resource manager's routines
1894  */
1895 void
1896 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1897 {
1898         uint8           info = record->xl_info & ~XLR_INFO_MASK;
1899
1900         /* Backup blocks are not used in dbase records */
1901         Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
1902
1903         if (info == XLOG_DBASE_CREATE)
1904         {
1905                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1906                 char       *src_path;
1907                 char       *dst_path;
1908                 struct stat st;
1909
1910                 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1911                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1912
1913                 /*
1914                  * Our theory for replaying a CREATE is to forcibly drop the target
1915                  * subdirectory if present, then re-copy the source data. This may be
1916                  * more work than needed, but it is simple to implement.
1917                  */
1918                 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1919                 {
1920                         if (!rmtree(dst_path, true))
1921                                 ereport(WARNING,
1922                                                 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1923                                                                 dst_path)));
1924                 }
1925
1926                 /*
1927                  * Force dirty buffers out to disk, to ensure source database is
1928                  * up-to-date for the copy.
1929                  */
1930                 FlushDatabaseBuffers(xlrec->src_db_id);
1931
1932                 /*
1933                  * Copy this subdirectory to the new location
1934                  *
1935                  * We don't need to copy subdirectories
1936                  */
1937                 copydir(src_path, dst_path, false);
1938         }
1939         else if (info == XLOG_DBASE_DROP)
1940         {
1941                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1942                 char       *dst_path;
1943
1944                 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1945
1946                 if (InHotStandby)
1947                 {
1948                         VirtualTransactionId *database_users;
1949
1950                         /*
1951                          * Find all users connected to this database and ask them
1952                          * politely to immediately kill their sessions before processing
1953                          * the drop database record, after the usual grace period.
1954                          * We don't wait for commit because drop database is
1955                          * non-transactional.
1956                          */
1957                     database_users = GetConflictingVirtualXIDs(InvalidTransactionId,
1958                                                                                                            xlrec->db_id,
1959                                                                                                            false);
1960
1961                         ResolveRecoveryConflictWithVirtualXIDs(database_users,
1962                                                                                                    "drop database",
1963                                                                                                    CONFLICT_MODE_FATAL);
1964                 }
1965
1966                 /* Drop pages for this database that are in the shared buffer cache */
1967                 DropDatabaseBuffers(xlrec->db_id);
1968
1969                 /* Also, clean out any fsync requests that might be pending in md.c */
1970                 ForgetDatabaseFsyncRequests(xlrec->db_id);
1971
1972                 /* Clean out the xlog relcache too */
1973                 XLogDropDatabase(xlrec->db_id);
1974
1975                 /* And remove the physical files */
1976                 if (!rmtree(dst_path, true))
1977                         ereport(WARNING,
1978                                         (errmsg("some useless files may be left behind in old database directory \"%s\"",
1979                                                         dst_path)));
1980         }
1981         else
1982                 elog(PANIC, "dbase_redo: unknown op code %u", info);
1983 }
1984
1985 void
1986 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
1987 {
1988         uint8           info = xl_info & ~XLR_INFO_MASK;
1989
1990         if (info == XLOG_DBASE_CREATE)
1991         {
1992                 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1993
1994                 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
1995                                                  xlrec->src_db_id, xlrec->src_tablespace_id,
1996                                                  xlrec->db_id, xlrec->tablespace_id);
1997         }
1998         else if (info == XLOG_DBASE_DROP)
1999         {
2000                 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
2001
2002                 appendStringInfo(buf, "drop db: dir %u/%u",
2003                                                  xlrec->db_id, xlrec->tablespace_id);
2004         }
2005         else
2006                 appendStringInfo(buf, "UNKNOWN");
2007 }