Update copyright for 2022
[pgsql.git] / src / backend / commands / dbcommands.c
blob509d1a3e92f0e382648599662a0c9d75ba8c9c38
1 /*-------------------------------------------------------------------------
3 * dbcommands.c
4 * Database management commands (create/drop database).
6 * Note: database creation/destruction commands use exclusive locks on
7 * the database objects (as expressed by LockSharedObject()) to avoid
8 * stepping on each others' toes. Formerly we used table-level locks
9 * on pg_database, but that's too coarse-grained.
11 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
15 * IDENTIFICATION
16 * src/backend/commands/dbcommands.c
18 *-------------------------------------------------------------------------
20 #include "postgres.h"
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <sys/stat.h>
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/htup_details.h"
29 #include "access/multixact.h"
30 #include "access/tableam.h"
31 #include "access/xact.h"
32 #include "access/xloginsert.h"
33 #include "access/xlogutils.h"
34 #include "catalog/catalog.h"
35 #include "catalog/dependency.h"
36 #include "catalog/indexing.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_authid.h"
39 #include "catalog/pg_database.h"
40 #include "catalog/pg_db_role_setting.h"
41 #include "catalog/pg_subscription.h"
42 #include "catalog/pg_tablespace.h"
43 #include "commands/comment.h"
44 #include "commands/dbcommands.h"
45 #include "commands/dbcommands_xlog.h"
46 #include "commands/defrem.h"
47 #include "commands/seclabel.h"
48 #include "commands/tablespace.h"
49 #include "mb/pg_wchar.h"
50 #include "miscadmin.h"
51 #include "pgstat.h"
52 #include "postmaster/bgwriter.h"
53 #include "replication/slot.h"
54 #include "storage/copydir.h"
55 #include "storage/fd.h"
56 #include "storage/ipc.h"
57 #include "storage/lmgr.h"
58 #include "storage/md.h"
59 #include "storage/procarray.h"
60 #include "storage/smgr.h"
61 #include "utils/acl.h"
62 #include "utils/builtins.h"
63 #include "utils/fmgroids.h"
64 #include "utils/pg_locale.h"
65 #include "utils/snapmgr.h"
66 #include "utils/syscache.h"
68 typedef struct
70 Oid src_dboid; /* source (template) DB */
71 Oid dest_dboid; /* DB we are trying to create */
72 } createdb_failure_params;
74 typedef struct
76 Oid dest_dboid; /* DB we are trying to move */
77 Oid dest_tsoid; /* tablespace we are trying to move to */
78 } movedb_failure_params;
80 /* non-export function prototypes */
81 static void createdb_failure_callback(int code, Datum arg);
82 static void movedb(const char *dbname, const char *tblspcname);
83 static void movedb_failure_callback(int code, Datum arg);
84 static bool get_db_info(const char *name, LOCKMODE lockmode,
85 Oid *dbIdP, Oid *ownerIdP,
86 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
87 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
88 MultiXactId *dbMinMultiP,
89 Oid *dbTablespace, char **dbCollate, char **dbCtype);
90 static bool have_createdb_privilege(void);
91 static void remove_dbtablespaces(Oid db_id);
92 static bool check_db_file_conflict(Oid db_id);
93 static int errdetail_busy_db(int notherbackends, int npreparedxacts);
97 * CREATE DATABASE
99 Oid
100 createdb(ParseState *pstate, const CreatedbStmt *stmt)
102 TableScanDesc scan;
103 Relation rel;
104 Oid src_dboid;
105 Oid src_owner;
106 int src_encoding = -1;
107 char *src_collate = NULL;
108 char *src_ctype = NULL;
109 bool src_istemplate;
110 bool src_allowconn;
111 Oid src_lastsysoid = InvalidOid;
112 TransactionId src_frozenxid = InvalidTransactionId;
113 MultiXactId src_minmxid = InvalidMultiXactId;
114 Oid src_deftablespace;
115 volatile Oid dst_deftablespace;
116 Relation pg_database_rel;
117 HeapTuple tuple;
118 Datum new_record[Natts_pg_database];
119 bool new_record_nulls[Natts_pg_database];
120 Oid dboid;
121 Oid datdba;
122 ListCell *option;
123 DefElem *dtablespacename = NULL;
124 DefElem *downer = NULL;
125 DefElem *dtemplate = NULL;
126 DefElem *dencoding = NULL;
127 DefElem *dlocale = NULL;
128 DefElem *dcollate = NULL;
129 DefElem *dctype = NULL;
130 DefElem *distemplate = NULL;
131 DefElem *dallowconnections = NULL;
132 DefElem *dconnlimit = NULL;
133 char *dbname = stmt->dbname;
134 char *dbowner = NULL;
135 const char *dbtemplate = NULL;
136 char *dbcollate = NULL;
137 char *dbctype = NULL;
138 char *canonname;
139 int encoding = -1;
140 bool dbistemplate = false;
141 bool dballowconnections = true;
142 int dbconnlimit = -1;
143 int notherbackends;
144 int npreparedxacts;
145 createdb_failure_params fparms;
147 /* Extract options from the statement node tree */
148 foreach(option, stmt->options)
150 DefElem *defel = (DefElem *) lfirst(option);
152 if (strcmp(defel->defname, "tablespace") == 0)
154 if (dtablespacename)
155 errorConflictingDefElem(defel, pstate);
156 dtablespacename = defel;
158 else if (strcmp(defel->defname, "owner") == 0)
160 if (downer)
161 errorConflictingDefElem(defel, pstate);
162 downer = defel;
164 else if (strcmp(defel->defname, "template") == 0)
166 if (dtemplate)
167 errorConflictingDefElem(defel, pstate);
168 dtemplate = defel;
170 else if (strcmp(defel->defname, "encoding") == 0)
172 if (dencoding)
173 errorConflictingDefElem(defel, pstate);
174 dencoding = defel;
176 else if (strcmp(defel->defname, "locale") == 0)
178 if (dlocale)
179 errorConflictingDefElem(defel, pstate);
180 dlocale = defel;
182 else if (strcmp(defel->defname, "lc_collate") == 0)
184 if (dcollate)
185 errorConflictingDefElem(defel, pstate);
186 dcollate = defel;
188 else if (strcmp(defel->defname, "lc_ctype") == 0)
190 if (dctype)
191 errorConflictingDefElem(defel, pstate);
192 dctype = defel;
194 else if (strcmp(defel->defname, "is_template") == 0)
196 if (distemplate)
197 errorConflictingDefElem(defel, pstate);
198 distemplate = defel;
200 else if (strcmp(defel->defname, "allow_connections") == 0)
202 if (dallowconnections)
203 errorConflictingDefElem(defel, pstate);
204 dallowconnections = defel;
206 else if (strcmp(defel->defname, "connection_limit") == 0)
208 if (dconnlimit)
209 errorConflictingDefElem(defel, pstate);
210 dconnlimit = defel;
212 else if (strcmp(defel->defname, "location") == 0)
214 ereport(WARNING,
215 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
216 errmsg("LOCATION is not supported anymore"),
217 errhint("Consider using tablespaces instead."),
218 parser_errposition(pstate, defel->location)));
220 else
221 ereport(ERROR,
222 (errcode(ERRCODE_SYNTAX_ERROR),
223 errmsg("option \"%s\" not recognized", defel->defname),
224 parser_errposition(pstate, defel->location)));
227 if (dlocale && (dcollate || dctype))
228 ereport(ERROR,
229 (errcode(ERRCODE_SYNTAX_ERROR),
230 errmsg("conflicting or redundant options"),
231 errdetail("LOCALE cannot be specified together with LC_COLLATE or LC_CTYPE.")));
233 if (downer && downer->arg)
234 dbowner = defGetString(downer);
235 if (dtemplate && dtemplate->arg)
236 dbtemplate = defGetString(dtemplate);
237 if (dencoding && dencoding->arg)
239 const char *encoding_name;
241 if (IsA(dencoding->arg, Integer))
243 encoding = defGetInt32(dencoding);
244 encoding_name = pg_encoding_to_char(encoding);
245 if (strcmp(encoding_name, "") == 0 ||
246 pg_valid_server_encoding(encoding_name) < 0)
247 ereport(ERROR,
248 (errcode(ERRCODE_UNDEFINED_OBJECT),
249 errmsg("%d is not a valid encoding code",
250 encoding),
251 parser_errposition(pstate, dencoding->location)));
253 else
255 encoding_name = defGetString(dencoding);
256 encoding = pg_valid_server_encoding(encoding_name);
257 if (encoding < 0)
258 ereport(ERROR,
259 (errcode(ERRCODE_UNDEFINED_OBJECT),
260 errmsg("%s is not a valid encoding name",
261 encoding_name),
262 parser_errposition(pstate, dencoding->location)));
265 if (dlocale && dlocale->arg)
267 dbcollate = defGetString(dlocale);
268 dbctype = defGetString(dlocale);
270 if (dcollate && dcollate->arg)
271 dbcollate = defGetString(dcollate);
272 if (dctype && dctype->arg)
273 dbctype = defGetString(dctype);
274 if (distemplate && distemplate->arg)
275 dbistemplate = defGetBoolean(distemplate);
276 if (dallowconnections && dallowconnections->arg)
277 dballowconnections = defGetBoolean(dallowconnections);
278 if (dconnlimit && dconnlimit->arg)
280 dbconnlimit = defGetInt32(dconnlimit);
281 if (dbconnlimit < -1)
282 ereport(ERROR,
283 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
284 errmsg("invalid connection limit: %d", dbconnlimit)));
287 /* obtain OID of proposed owner */
288 if (dbowner)
289 datdba = get_role_oid(dbowner, false);
290 else
291 datdba = GetUserId();
294 * To create a database, must have createdb privilege and must be able to
295 * become the target role (this does not imply that the target role itself
296 * must have createdb privilege). The latter provision guards against
297 * "giveaway" attacks. Note that a superuser will always have both of
298 * these privileges a fortiori.
300 if (!have_createdb_privilege())
301 ereport(ERROR,
302 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
303 errmsg("permission denied to create database")));
305 check_is_member_of_role(GetUserId(), datdba);
308 * Lookup database (template) to be cloned, and obtain share lock on it.
309 * ShareLock allows two CREATE DATABASEs to work from the same template
310 * concurrently, while ensuring no one is busy dropping it in parallel
311 * (which would be Very Bad since we'd likely get an incomplete copy
312 * without knowing it). This also prevents any new connections from being
313 * made to the source until we finish copying it, so we can be sure it
314 * won't change underneath us.
316 if (!dbtemplate)
317 dbtemplate = "template1"; /* Default template database name */
319 if (!get_db_info(dbtemplate, ShareLock,
320 &src_dboid, &src_owner, &src_encoding,
321 &src_istemplate, &src_allowconn, &src_lastsysoid,
322 &src_frozenxid, &src_minmxid, &src_deftablespace,
323 &src_collate, &src_ctype))
324 ereport(ERROR,
325 (errcode(ERRCODE_UNDEFINED_DATABASE),
326 errmsg("template database \"%s\" does not exist",
327 dbtemplate)));
330 * Permission check: to copy a DB that's not marked datistemplate, you
331 * must be superuser or the owner thereof.
333 if (!src_istemplate)
335 if (!pg_database_ownercheck(src_dboid, GetUserId()))
336 ereport(ERROR,
337 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
338 errmsg("permission denied to copy database \"%s\"",
339 dbtemplate)));
342 /* If encoding or locales are defaulted, use source's setting */
343 if (encoding < 0)
344 encoding = src_encoding;
345 if (dbcollate == NULL)
346 dbcollate = src_collate;
347 if (dbctype == NULL)
348 dbctype = src_ctype;
350 /* Some encodings are client only */
351 if (!PG_VALID_BE_ENCODING(encoding))
352 ereport(ERROR,
353 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
354 errmsg("invalid server encoding %d", encoding)));
356 /* Check that the chosen locales are valid, and get canonical spellings */
357 if (!check_locale(LC_COLLATE, dbcollate, &canonname))
358 ereport(ERROR,
359 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
360 errmsg("invalid locale name: \"%s\"", dbcollate)));
361 dbcollate = canonname;
362 if (!check_locale(LC_CTYPE, dbctype, &canonname))
363 ereport(ERROR,
364 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
365 errmsg("invalid locale name: \"%s\"", dbctype)));
366 dbctype = canonname;
368 check_encoding_locale_matches(encoding, dbcollate, dbctype);
371 * Check that the new encoding and locale settings match the source
372 * database. We insist on this because we simply copy the source data ---
373 * any non-ASCII data would be wrongly encoded, and any indexes sorted
374 * according to the source locale would be wrong.
376 * However, we assume that template0 doesn't contain any non-ASCII data
377 * nor any indexes that depend on collation or ctype, so template0 can be
378 * used as template for creating a database with any encoding or locale.
380 if (strcmp(dbtemplate, "template0") != 0)
382 if (encoding != src_encoding)
383 ereport(ERROR,
384 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
385 errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
386 pg_encoding_to_char(encoding),
387 pg_encoding_to_char(src_encoding)),
388 errhint("Use the same encoding as in the template database, or use template0 as template.")));
390 if (strcmp(dbcollate, src_collate) != 0)
391 ereport(ERROR,
392 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
393 errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
394 dbcollate, src_collate),
395 errhint("Use the same collation as in the template database, or use template0 as template.")));
397 if (strcmp(dbctype, src_ctype) != 0)
398 ereport(ERROR,
399 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
400 errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
401 dbctype, src_ctype),
402 errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
405 /* Resolve default tablespace for new database */
406 if (dtablespacename && dtablespacename->arg)
408 char *tablespacename;
409 AclResult aclresult;
411 tablespacename = defGetString(dtablespacename);
412 dst_deftablespace = get_tablespace_oid(tablespacename, false);
413 /* check permissions */
414 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
415 ACL_CREATE);
416 if (aclresult != ACLCHECK_OK)
417 aclcheck_error(aclresult, OBJECT_TABLESPACE,
418 tablespacename);
420 /* pg_global must never be the default tablespace */
421 if (dst_deftablespace == GLOBALTABLESPACE_OID)
422 ereport(ERROR,
423 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
424 errmsg("pg_global cannot be used as default tablespace")));
427 * If we are trying to change the default tablespace of the template,
428 * we require that the template not have any files in the new default
429 * tablespace. This is necessary because otherwise the copied
430 * database would contain pg_class rows that refer to its default
431 * tablespace both explicitly (by OID) and implicitly (as zero), which
432 * would cause problems. For example another CREATE DATABASE using
433 * the copied database as template, and trying to change its default
434 * tablespace again, would yield outright incorrect results (it would
435 * improperly move tables to the new default tablespace that should
436 * stay in the same tablespace).
438 if (dst_deftablespace != src_deftablespace)
440 char *srcpath;
441 struct stat st;
443 srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
445 if (stat(srcpath, &st) == 0 &&
446 S_ISDIR(st.st_mode) &&
447 !directory_is_empty(srcpath))
448 ereport(ERROR,
449 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
450 errmsg("cannot assign new default tablespace \"%s\"",
451 tablespacename),
452 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
453 dbtemplate)));
454 pfree(srcpath);
457 else
459 /* Use template database's default tablespace */
460 dst_deftablespace = src_deftablespace;
461 /* Note there is no additional permission check in this path */
465 * If built with appropriate switch, whine when regression-testing
466 * conventions for database names are violated. But don't complain during
467 * initdb.
469 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
470 if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
471 elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
472 #endif
475 * Check for db name conflict. This is just to give a more friendly error
476 * message than "unique index violation". There's a race condition but
477 * we're willing to accept the less friendly message in that case.
479 if (OidIsValid(get_database_oid(dbname, true)))
480 ereport(ERROR,
481 (errcode(ERRCODE_DUPLICATE_DATABASE),
482 errmsg("database \"%s\" already exists", dbname)));
485 * The source DB can't have any active backends, except this one
486 * (exception is to allow CREATE DB while connected to template1).
487 * Otherwise we might copy inconsistent data.
489 * This should be last among the basic error checks, because it involves
490 * potential waiting; we may as well throw an error first if we're gonna
491 * throw one.
493 if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
494 ereport(ERROR,
495 (errcode(ERRCODE_OBJECT_IN_USE),
496 errmsg("source database \"%s\" is being accessed by other users",
497 dbtemplate),
498 errdetail_busy_db(notherbackends, npreparedxacts)));
501 * Select an OID for the new database, checking that it doesn't have a
502 * filename conflict with anything already existing in the tablespace
503 * directories.
505 pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
509 dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
510 Anum_pg_database_oid);
511 } while (check_db_file_conflict(dboid));
514 * Insert a new tuple into pg_database. This establishes our ownership of
515 * the new database name (anyone else trying to insert the same name will
516 * block on the unique index, and fail after we commit).
519 /* Form tuple */
520 MemSet(new_record, 0, sizeof(new_record));
521 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
523 new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
524 new_record[Anum_pg_database_datname - 1] =
525 DirectFunctionCall1(namein, CStringGetDatum(dbname));
526 new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
527 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
528 new_record[Anum_pg_database_datcollate - 1] =
529 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
530 new_record[Anum_pg_database_datctype - 1] =
531 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
532 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
533 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
534 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
535 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
536 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
537 new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
538 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
541 * We deliberately set datacl to default (NULL), rather than copying it
542 * from the template database. Copying it would be a bad idea when the
543 * owner is not the same as the template's owner.
545 new_record_nulls[Anum_pg_database_datacl - 1] = true;
547 tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
548 new_record, new_record_nulls);
550 CatalogTupleInsert(pg_database_rel, tuple);
553 * Now generate additional catalog entries associated with the new DB
556 /* Register owner dependency */
557 recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
559 /* Create pg_shdepend entries for objects within database */
560 copyTemplateDependencies(src_dboid, dboid);
562 /* Post creation hook for new database */
563 InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
566 * Force a checkpoint before starting the copy. This will force all dirty
567 * buffers, including those of unlogged tables, out to disk, to ensure
568 * source database is up-to-date on disk for the copy.
569 * FlushDatabaseBuffers() would suffice for that, but we also want to
570 * process any pending unlink requests. Otherwise, if a checkpoint
571 * happened while we're copying files, a file might be deleted just when
572 * we're about to copy it, causing the lstat() call in copydir() to fail
573 * with ENOENT.
575 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
576 | CHECKPOINT_FLUSH_ALL);
579 * Once we start copying subdirectories, we need to be able to clean 'em
580 * up if we fail. Use an ENSURE block to make sure this happens. (This
581 * is not a 100% solution, because of the possibility of failure during
582 * transaction commit after we leave this routine, but it should handle
583 * most scenarios.)
585 fparms.src_dboid = src_dboid;
586 fparms.dest_dboid = dboid;
587 PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
588 PointerGetDatum(&fparms));
591 * Iterate through all tablespaces of the template database, and copy
592 * each one to the new database.
594 rel = table_open(TableSpaceRelationId, AccessShareLock);
595 scan = table_beginscan_catalog(rel, 0, NULL);
596 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
598 Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
599 Oid srctablespace = spaceform->oid;
600 Oid dsttablespace;
601 char *srcpath;
602 char *dstpath;
603 struct stat st;
605 /* No need to copy global tablespace */
606 if (srctablespace == GLOBALTABLESPACE_OID)
607 continue;
609 srcpath = GetDatabasePath(src_dboid, srctablespace);
611 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
612 directory_is_empty(srcpath))
614 /* Assume we can ignore it */
615 pfree(srcpath);
616 continue;
619 if (srctablespace == src_deftablespace)
620 dsttablespace = dst_deftablespace;
621 else
622 dsttablespace = srctablespace;
624 dstpath = GetDatabasePath(dboid, dsttablespace);
627 * Copy this subdirectory to the new location
629 * We don't need to copy subdirectories
631 copydir(srcpath, dstpath, false);
633 /* Record the filesystem change in XLOG */
635 xl_dbase_create_rec xlrec;
637 xlrec.db_id = dboid;
638 xlrec.tablespace_id = dsttablespace;
639 xlrec.src_db_id = src_dboid;
640 xlrec.src_tablespace_id = srctablespace;
642 XLogBeginInsert();
643 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
645 (void) XLogInsert(RM_DBASE_ID,
646 XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
649 table_endscan(scan);
650 table_close(rel, AccessShareLock);
653 * We force a checkpoint before committing. This effectively means
654 * that committed XLOG_DBASE_CREATE operations will never need to be
655 * replayed (at least not in ordinary crash recovery; we still have to
656 * make the XLOG entry for the benefit of PITR operations). This
657 * avoids two nasty scenarios:
659 * #1: When PITR is off, we don't XLOG the contents of newly created
660 * indexes; therefore the drop-and-recreate-whole-directory behavior
661 * of DBASE_CREATE replay would lose such indexes.
663 * #2: Since we have to recopy the source database during DBASE_CREATE
664 * replay, we run the risk of copying changes in it that were
665 * committed after the original CREATE DATABASE command but before the
666 * system crash that led to the replay. This is at least unexpected
667 * and at worst could lead to inconsistencies, eg duplicate table
668 * names.
670 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
672 * In PITR replay, the first of these isn't an issue, and the second
673 * is only a risk if the CREATE DATABASE and subsequent template
674 * database change both occur while a base backup is being taken.
675 * There doesn't seem to be much we can do about that except document
676 * it as a limitation.
678 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
679 * we can avoid this.
681 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
684 * Close pg_database, but keep lock till commit.
686 table_close(pg_database_rel, NoLock);
689 * Force synchronous commit, thus minimizing the window between
690 * creation of the database files and committal of the transaction. If
691 * we crash before committing, we'll have a DB that's taking up disk
692 * space but is not in pg_database, which is not good.
694 ForceSyncCommit();
696 PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
697 PointerGetDatum(&fparms));
699 return dboid;
703 * Check whether chosen encoding matches chosen locale settings. This
704 * restriction is necessary because libc's locale-specific code usually
705 * fails when presented with data in an encoding it's not expecting. We
706 * allow mismatch in four cases:
708 * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
709 * which works with any encoding.
711 * 2. locale encoding = -1, which means that we couldn't determine the
712 * locale's encoding and have to trust the user to get it right.
714 * 3. selected encoding is UTF8 and platform is win32. This is because
715 * UTF8 is a pseudo codepage that is supported in all locales since it's
716 * converted to UTF16 before being used.
718 * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
719 * is risky but we have historically allowed it --- notably, the
720 * regression tests require it.
722 * Note: if you change this policy, fix initdb to match.
724 void
725 check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
727 int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
728 int collate_encoding = pg_get_encoding_from_locale(collate, true);
730 if (!(ctype_encoding == encoding ||
731 ctype_encoding == PG_SQL_ASCII ||
732 ctype_encoding == -1 ||
733 #ifdef WIN32
734 encoding == PG_UTF8 ||
735 #endif
736 (encoding == PG_SQL_ASCII && superuser())))
737 ereport(ERROR,
738 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
739 errmsg("encoding \"%s\" does not match locale \"%s\"",
740 pg_encoding_to_char(encoding),
741 ctype),
742 errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
743 pg_encoding_to_char(ctype_encoding))));
745 if (!(collate_encoding == encoding ||
746 collate_encoding == PG_SQL_ASCII ||
747 collate_encoding == -1 ||
748 #ifdef WIN32
749 encoding == PG_UTF8 ||
750 #endif
751 (encoding == PG_SQL_ASCII && superuser())))
752 ereport(ERROR,
753 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
754 errmsg("encoding \"%s\" does not match locale \"%s\"",
755 pg_encoding_to_char(encoding),
756 collate),
757 errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
758 pg_encoding_to_char(collate_encoding))));
761 /* Error cleanup callback for createdb */
762 static void
763 createdb_failure_callback(int code, Datum arg)
765 createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
768 * Release lock on source database before doing recursive remove. This is
769 * not essential but it seems desirable to release the lock as soon as
770 * possible.
772 UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
774 /* Throw away any successfully copied subdirectories */
775 remove_dbtablespaces(fparms->dest_dboid);
780 * DROP DATABASE
782 void
783 dropdb(const char *dbname, bool missing_ok, bool force)
785 Oid db_id;
786 bool db_istemplate;
787 Relation pgdbrel;
788 HeapTuple tup;
789 int notherbackends;
790 int npreparedxacts;
791 int nslots,
792 nslots_active;
793 int nsubscriptions;
796 * Look up the target database's OID, and get exclusive lock on it. We
797 * need this to ensure that no new backend starts up in the target
798 * database while we are deleting it (see postinit.c), and that no one is
799 * using it as a CREATE DATABASE template or trying to delete it for
800 * themselves.
802 pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
804 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
805 &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
807 if (!missing_ok)
809 ereport(ERROR,
810 (errcode(ERRCODE_UNDEFINED_DATABASE),
811 errmsg("database \"%s\" does not exist", dbname)));
813 else
815 /* Close pg_database, release the lock, since we changed nothing */
816 table_close(pgdbrel, RowExclusiveLock);
817 ereport(NOTICE,
818 (errmsg("database \"%s\" does not exist, skipping",
819 dbname)));
820 return;
825 * Permission checks
827 if (!pg_database_ownercheck(db_id, GetUserId()))
828 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
829 dbname);
831 /* DROP hook for the database being removed */
832 InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
835 * Disallow dropping a DB that is marked istemplate. This is just to
836 * prevent people from accidentally dropping template0 or template1; they
837 * can do so if they're really determined ...
839 if (db_istemplate)
840 ereport(ERROR,
841 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
842 errmsg("cannot drop a template database")));
844 /* Obviously can't drop my own database */
845 if (db_id == MyDatabaseId)
846 ereport(ERROR,
847 (errcode(ERRCODE_OBJECT_IN_USE),
848 errmsg("cannot drop the currently open database")));
851 * Check whether there are active logical slots that refer to the
852 * to-be-dropped database. The database lock we are holding prevents the
853 * creation of new slots using the database or existing slots becoming
854 * active.
856 (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
857 if (nslots_active)
859 ereport(ERROR,
860 (errcode(ERRCODE_OBJECT_IN_USE),
861 errmsg("database \"%s\" is used by an active logical replication slot",
862 dbname),
863 errdetail_plural("There is %d active slot.",
864 "There are %d active slots.",
865 nslots_active, nslots_active)));
869 * Check if there are subscriptions defined in the target database.
871 * We can't drop them automatically because they might be holding
872 * resources in other databases/instances.
874 if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
875 ereport(ERROR,
876 (errcode(ERRCODE_OBJECT_IN_USE),
877 errmsg("database \"%s\" is being used by logical replication subscription",
878 dbname),
879 errdetail_plural("There is %d subscription.",
880 "There are %d subscriptions.",
881 nsubscriptions, nsubscriptions)));
885 * Attempt to terminate all existing connections to the target database if
886 * the user has requested to do so.
888 if (force)
889 TerminateOtherDBBackends(db_id);
892 * Check for other backends in the target database. (Because we hold the
893 * database lock, no new ones can start after this.)
895 * As in CREATE DATABASE, check this after other error conditions.
897 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
898 ereport(ERROR,
899 (errcode(ERRCODE_OBJECT_IN_USE),
900 errmsg("database \"%s\" is being accessed by other users",
901 dbname),
902 errdetail_busy_db(notherbackends, npreparedxacts)));
905 * Remove the database's tuple from pg_database.
907 tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
908 if (!HeapTupleIsValid(tup))
909 elog(ERROR, "cache lookup failed for database %u", db_id);
911 CatalogTupleDelete(pgdbrel, &tup->t_self);
913 ReleaseSysCache(tup);
916 * Delete any comments or security labels associated with the database.
918 DeleteSharedComments(db_id, DatabaseRelationId);
919 DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
922 * Remove settings associated with this database
924 DropSetting(db_id, InvalidOid);
927 * Remove shared dependency references for the database.
929 dropDatabaseDependencies(db_id);
932 * Drop db-specific replication slots.
934 ReplicationSlotsDropDBSlots(db_id);
937 * Drop pages for this database that are in the shared buffer cache. This
938 * is important to ensure that no remaining backend tries to write out a
939 * dirty buffer to the dead database later...
941 DropDatabaseBuffers(db_id);
944 * Tell the stats collector to forget it immediately, too.
946 pgstat_drop_database(db_id);
949 * Tell checkpointer to forget any pending fsync and unlink requests for
950 * files in the database; else the fsyncs will fail at next checkpoint, or
951 * worse, it will delete files that belong to a newly created database
952 * with the same OID.
954 ForgetDatabaseSyncRequests(db_id);
957 * Force a checkpoint to make sure the checkpointer has received the
958 * message sent by ForgetDatabaseSyncRequests. On Windows, this also
959 * ensures that background procs don't hold any open files, which would
960 * cause rmdir() to fail.
962 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
965 * Remove all tablespace subdirs belonging to the database.
967 remove_dbtablespaces(db_id);
970 * Close pg_database, but keep lock till commit.
972 table_close(pgdbrel, NoLock);
975 * Force synchronous commit, thus minimizing the window between removal of
976 * the database files and committal of the transaction. If we crash before
977 * committing, we'll have a DB that's gone on disk but still there
978 * according to pg_database, which is not good.
980 ForceSyncCommit();
985 * Rename database
987 ObjectAddress
988 RenameDatabase(const char *oldname, const char *newname)
990 Oid db_id;
991 HeapTuple newtup;
992 Relation rel;
993 int notherbackends;
994 int npreparedxacts;
995 ObjectAddress address;
998 * Look up the target database's OID, and get exclusive lock on it. We
999 * need this for the same reasons as DROP DATABASE.
1001 rel = table_open(DatabaseRelationId, RowExclusiveLock);
1003 if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
1004 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1005 ereport(ERROR,
1006 (errcode(ERRCODE_UNDEFINED_DATABASE),
1007 errmsg("database \"%s\" does not exist", oldname)));
1009 /* must be owner */
1010 if (!pg_database_ownercheck(db_id, GetUserId()))
1011 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1012 oldname);
1014 /* must have createdb rights */
1015 if (!have_createdb_privilege())
1016 ereport(ERROR,
1017 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1018 errmsg("permission denied to rename database")));
1021 * If built with appropriate switch, whine when regression-testing
1022 * conventions for database names are violated.
1024 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1025 if (strstr(newname, "regression") == NULL)
1026 elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1027 #endif
1030 * Make sure the new name doesn't exist. See notes for same error in
1031 * CREATE DATABASE.
1033 if (OidIsValid(get_database_oid(newname, true)))
1034 ereport(ERROR,
1035 (errcode(ERRCODE_DUPLICATE_DATABASE),
1036 errmsg("database \"%s\" already exists", newname)));
1039 * XXX Client applications probably store the current database somewhere,
1040 * so renaming it could cause confusion. On the other hand, there may not
1041 * be an actual problem besides a little confusion, so think about this
1042 * and decide.
1044 if (db_id == MyDatabaseId)
1045 ereport(ERROR,
1046 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1047 errmsg("current database cannot be renamed")));
1050 * Make sure the database does not have active sessions. This is the same
1051 * concern as above, but applied to other sessions.
1053 * As in CREATE DATABASE, check this after other error conditions.
1055 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1056 ereport(ERROR,
1057 (errcode(ERRCODE_OBJECT_IN_USE),
1058 errmsg("database \"%s\" is being accessed by other users",
1059 oldname),
1060 errdetail_busy_db(notherbackends, npreparedxacts)));
1062 /* rename */
1063 newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1064 if (!HeapTupleIsValid(newtup))
1065 elog(ERROR, "cache lookup failed for database %u", db_id);
1066 namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1067 CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1069 InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1071 ObjectAddressSet(address, DatabaseRelationId, db_id);
1074 * Close pg_database, but keep lock till commit.
1076 table_close(rel, NoLock);
1078 return address;
1083 * ALTER DATABASE SET TABLESPACE
1085 static void
1086 movedb(const char *dbname, const char *tblspcname)
1088 Oid db_id;
1089 Relation pgdbrel;
1090 int notherbackends;
1091 int npreparedxacts;
1092 HeapTuple oldtuple,
1093 newtuple;
1094 Oid src_tblspcoid,
1095 dst_tblspcoid;
1096 Datum new_record[Natts_pg_database];
1097 bool new_record_nulls[Natts_pg_database];
1098 bool new_record_repl[Natts_pg_database];
1099 ScanKeyData scankey;
1100 SysScanDesc sysscan;
1101 AclResult aclresult;
1102 char *src_dbpath;
1103 char *dst_dbpath;
1104 DIR *dstdir;
1105 struct dirent *xlde;
1106 movedb_failure_params fparms;
1109 * Look up the target database's OID, and get exclusive lock on it. We
1110 * need this to ensure that no new backend starts up in the database while
1111 * we are moving it, and that no one is using it as a CREATE DATABASE
1112 * template or trying to delete it.
1114 pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1116 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1117 NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1118 ereport(ERROR,
1119 (errcode(ERRCODE_UNDEFINED_DATABASE),
1120 errmsg("database \"%s\" does not exist", dbname)));
1123 * We actually need a session lock, so that the lock will persist across
1124 * the commit/restart below. (We could almost get away with letting the
1125 * lock be released at commit, except that someone could try to move
1126 * relations of the DB back into the old directory while we rmtree() it.)
1128 LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1129 AccessExclusiveLock);
1132 * Permission checks
1134 if (!pg_database_ownercheck(db_id, GetUserId()))
1135 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1136 dbname);
1139 * Obviously can't move the tables of my own database
1141 if (db_id == MyDatabaseId)
1142 ereport(ERROR,
1143 (errcode(ERRCODE_OBJECT_IN_USE),
1144 errmsg("cannot change the tablespace of the currently open database")));
1147 * Get tablespace's oid
1149 dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1152 * Permission checks
1154 aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1155 ACL_CREATE);
1156 if (aclresult != ACLCHECK_OK)
1157 aclcheck_error(aclresult, OBJECT_TABLESPACE,
1158 tblspcname);
1161 * pg_global must never be the default tablespace
1163 if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1164 ereport(ERROR,
1165 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1166 errmsg("pg_global cannot be used as default tablespace")));
1169 * No-op if same tablespace
1171 if (src_tblspcoid == dst_tblspcoid)
1173 table_close(pgdbrel, NoLock);
1174 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1175 AccessExclusiveLock);
1176 return;
1180 * Check for other backends in the target database. (Because we hold the
1181 * database lock, no new ones can start after this.)
1183 * As in CREATE DATABASE, check this after other error conditions.
1185 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1186 ereport(ERROR,
1187 (errcode(ERRCODE_OBJECT_IN_USE),
1188 errmsg("database \"%s\" is being accessed by other users",
1189 dbname),
1190 errdetail_busy_db(notherbackends, npreparedxacts)));
1193 * Get old and new database paths
1195 src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1196 dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1199 * Force a checkpoint before proceeding. This will force all dirty
1200 * buffers, including those of unlogged tables, out to disk, to ensure
1201 * source database is up-to-date on disk for the copy.
1202 * FlushDatabaseBuffers() would suffice for that, but we also want to
1203 * process any pending unlink requests. Otherwise, the check for existing
1204 * files in the target directory might fail unnecessarily, not to mention
1205 * that the copy might fail due to source files getting deleted under it.
1206 * On Windows, this also ensures that background procs don't hold any open
1207 * files, which would cause rmdir() to fail.
1209 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
1210 | CHECKPOINT_FLUSH_ALL);
1213 * Now drop all buffers holding data of the target database; they should
1214 * no longer be dirty so DropDatabaseBuffers is safe.
1216 * It might seem that we could just let these buffers age out of shared
1217 * buffers naturally, since they should not get referenced anymore. The
1218 * problem with that is that if the user later moves the database back to
1219 * its original tablespace, any still-surviving buffers would appear to
1220 * contain valid data again --- but they'd be missing any changes made in
1221 * the database while it was in the new tablespace. In any case, freeing
1222 * buffers that should never be used again seems worth the cycles.
1224 * Note: it'd be sufficient to get rid of buffers matching db_id and
1225 * src_tblspcoid, but bufmgr.c presently provides no API for that.
1227 DropDatabaseBuffers(db_id);
1230 * Check for existence of files in the target directory, i.e., objects of
1231 * this database that are already in the target tablespace. We can't
1232 * allow the move in such a case, because we would need to change those
1233 * relations' pg_class.reltablespace entries to zero, and we don't have
1234 * access to the DB's pg_class to do so.
1236 dstdir = AllocateDir(dst_dbpath);
1237 if (dstdir != NULL)
1239 while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1241 if (strcmp(xlde->d_name, ".") == 0 ||
1242 strcmp(xlde->d_name, "..") == 0)
1243 continue;
1245 ereport(ERROR,
1246 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1247 errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1248 dbname, tblspcname),
1249 errhint("You must move them back to the database's default tablespace before using this command.")));
1252 FreeDir(dstdir);
1255 * The directory exists but is empty. We must remove it before using
1256 * the copydir function.
1258 if (rmdir(dst_dbpath) != 0)
1259 elog(ERROR, "could not remove directory \"%s\": %m",
1260 dst_dbpath);
1264 * Use an ENSURE block to make sure we remove the debris if the copy fails
1265 * (eg, due to out-of-disk-space). This is not a 100% solution, because
1266 * of the possibility of failure during transaction commit, but it should
1267 * handle most scenarios.
1269 fparms.dest_dboid = db_id;
1270 fparms.dest_tsoid = dst_tblspcoid;
1271 PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1272 PointerGetDatum(&fparms));
1275 * Copy files from the old tablespace to the new one
1277 copydir(src_dbpath, dst_dbpath, false);
1280 * Record the filesystem change in XLOG
1283 xl_dbase_create_rec xlrec;
1285 xlrec.db_id = db_id;
1286 xlrec.tablespace_id = dst_tblspcoid;
1287 xlrec.src_db_id = db_id;
1288 xlrec.src_tablespace_id = src_tblspcoid;
1290 XLogBeginInsert();
1291 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
1293 (void) XLogInsert(RM_DBASE_ID,
1294 XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
1298 * Update the database's pg_database tuple
1300 ScanKeyInit(&scankey,
1301 Anum_pg_database_datname,
1302 BTEqualStrategyNumber, F_NAMEEQ,
1303 CStringGetDatum(dbname));
1304 sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1305 NULL, 1, &scankey);
1306 oldtuple = systable_getnext(sysscan);
1307 if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
1308 ereport(ERROR,
1309 (errcode(ERRCODE_UNDEFINED_DATABASE),
1310 errmsg("database \"%s\" does not exist", dbname)));
1312 MemSet(new_record, 0, sizeof(new_record));
1313 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1314 MemSet(new_record_repl, false, sizeof(new_record_repl));
1316 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1317 new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1319 newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1320 new_record,
1321 new_record_nulls, new_record_repl);
1322 CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
1324 InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1326 systable_endscan(sysscan);
1329 * Force another checkpoint here. As in CREATE DATABASE, this is to
1330 * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1331 * operation, which would cause us to lose any unlogged operations
1332 * done in the new DB tablespace before the next checkpoint.
1334 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1337 * Force synchronous commit, thus minimizing the window between
1338 * copying the database files and committal of the transaction. If we
1339 * crash before committing, we'll leave an orphaned set of files on
1340 * disk, which is not fatal but not good either.
1342 ForceSyncCommit();
1345 * Close pg_database, but keep lock till commit.
1347 table_close(pgdbrel, NoLock);
1349 PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1350 PointerGetDatum(&fparms));
1353 * Commit the transaction so that the pg_database update is committed. If
1354 * we crash while removing files, the database won't be corrupt, we'll
1355 * just leave some orphaned files in the old directory.
1357 * (This is OK because we know we aren't inside a transaction block.)
1359 * XXX would it be safe/better to do this inside the ensure block? Not
1360 * convinced it's a good idea; consider elog just after the transaction
1361 * really commits.
1363 PopActiveSnapshot();
1364 CommitTransactionCommand();
1366 /* Start new transaction for the remaining work; don't need a snapshot */
1367 StartTransactionCommand();
1370 * Remove files from the old tablespace
1372 if (!rmtree(src_dbpath, true))
1373 ereport(WARNING,
1374 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1375 src_dbpath)));
1378 * Record the filesystem change in XLOG
1381 xl_dbase_drop_rec xlrec;
1383 xlrec.db_id = db_id;
1384 xlrec.ntablespaces = 1;
1386 XLogBeginInsert();
1387 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1388 XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
1390 (void) XLogInsert(RM_DBASE_ID,
1391 XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1394 /* Now it's safe to release the database lock */
1395 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1396 AccessExclusiveLock);
1399 /* Error cleanup callback for movedb */
1400 static void
1401 movedb_failure_callback(int code, Datum arg)
1403 movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1404 char *dstpath;
1406 /* Get rid of anything we managed to copy to the target directory */
1407 dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1409 (void) rmtree(dstpath, true);
1413 * Process options and call dropdb function.
1415 void
1416 DropDatabase(ParseState *pstate, DropdbStmt *stmt)
1418 bool force = false;
1419 ListCell *lc;
1421 foreach(lc, stmt->options)
1423 DefElem *opt = (DefElem *) lfirst(lc);
1425 if (strcmp(opt->defname, "force") == 0)
1426 force = true;
1427 else
1428 ereport(ERROR,
1429 (errcode(ERRCODE_SYNTAX_ERROR),
1430 errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
1431 parser_errposition(pstate, opt->location)));
1434 dropdb(stmt->dbname, stmt->missing_ok, force);
1438 * ALTER DATABASE name ...
1441 AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
1443 Relation rel;
1444 Oid dboid;
1445 HeapTuple tuple,
1446 newtuple;
1447 Form_pg_database datform;
1448 ScanKeyData scankey;
1449 SysScanDesc scan;
1450 ListCell *option;
1451 bool dbistemplate = false;
1452 bool dballowconnections = true;
1453 int dbconnlimit = -1;
1454 DefElem *distemplate = NULL;
1455 DefElem *dallowconnections = NULL;
1456 DefElem *dconnlimit = NULL;
1457 DefElem *dtablespace = NULL;
1458 Datum new_record[Natts_pg_database];
1459 bool new_record_nulls[Natts_pg_database];
1460 bool new_record_repl[Natts_pg_database];
1462 /* Extract options from the statement node tree */
1463 foreach(option, stmt->options)
1465 DefElem *defel = (DefElem *) lfirst(option);
1467 if (strcmp(defel->defname, "is_template") == 0)
1469 if (distemplate)
1470 errorConflictingDefElem(defel, pstate);
1471 distemplate = defel;
1473 else if (strcmp(defel->defname, "allow_connections") == 0)
1475 if (dallowconnections)
1476 errorConflictingDefElem(defel, pstate);
1477 dallowconnections = defel;
1479 else if (strcmp(defel->defname, "connection_limit") == 0)
1481 if (dconnlimit)
1482 errorConflictingDefElem(defel, pstate);
1483 dconnlimit = defel;
1485 else if (strcmp(defel->defname, "tablespace") == 0)
1487 if (dtablespace)
1488 errorConflictingDefElem(defel, pstate);
1489 dtablespace = defel;
1491 else
1492 ereport(ERROR,
1493 (errcode(ERRCODE_SYNTAX_ERROR),
1494 errmsg("option \"%s\" not recognized", defel->defname),
1495 parser_errposition(pstate, defel->location)));
1498 if (dtablespace)
1501 * While the SET TABLESPACE syntax doesn't allow any other options,
1502 * somebody could write "WITH TABLESPACE ...". Forbid any other
1503 * options from being specified in that case.
1505 if (list_length(stmt->options) != 1)
1506 ereport(ERROR,
1507 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1508 errmsg("option \"%s\" cannot be specified with other options",
1509 dtablespace->defname),
1510 parser_errposition(pstate, dtablespace->location)));
1511 /* this case isn't allowed within a transaction block */
1512 PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1513 movedb(stmt->dbname, defGetString(dtablespace));
1514 return InvalidOid;
1517 if (distemplate && distemplate->arg)
1518 dbistemplate = defGetBoolean(distemplate);
1519 if (dallowconnections && dallowconnections->arg)
1520 dballowconnections = defGetBoolean(dallowconnections);
1521 if (dconnlimit && dconnlimit->arg)
1523 dbconnlimit = defGetInt32(dconnlimit);
1524 if (dbconnlimit < -1)
1525 ereport(ERROR,
1526 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1527 errmsg("invalid connection limit: %d", dbconnlimit)));
1531 * Get the old tuple. We don't need a lock on the database per se,
1532 * because we're not going to do anything that would mess up incoming
1533 * connections.
1535 rel = table_open(DatabaseRelationId, RowExclusiveLock);
1536 ScanKeyInit(&scankey,
1537 Anum_pg_database_datname,
1538 BTEqualStrategyNumber, F_NAMEEQ,
1539 CStringGetDatum(stmt->dbname));
1540 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1541 NULL, 1, &scankey);
1542 tuple = systable_getnext(scan);
1543 if (!HeapTupleIsValid(tuple))
1544 ereport(ERROR,
1545 (errcode(ERRCODE_UNDEFINED_DATABASE),
1546 errmsg("database \"%s\" does not exist", stmt->dbname)));
1548 datform = (Form_pg_database) GETSTRUCT(tuple);
1549 dboid = datform->oid;
1551 if (!pg_database_ownercheck(dboid, GetUserId()))
1552 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1553 stmt->dbname);
1556 * In order to avoid getting locked out and having to go through
1557 * standalone mode, we refuse to disallow connections to the database
1558 * we're currently connected to. Lockout can still happen with concurrent
1559 * sessions but the likeliness of that is not high enough to worry about.
1561 if (!dballowconnections && dboid == MyDatabaseId)
1562 ereport(ERROR,
1563 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1564 errmsg("cannot disallow connections for current database")));
1567 * Build an updated tuple, perusing the information just obtained
1569 MemSet(new_record, 0, sizeof(new_record));
1570 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1571 MemSet(new_record_repl, false, sizeof(new_record_repl));
1573 if (distemplate)
1575 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1576 new_record_repl[Anum_pg_database_datistemplate - 1] = true;
1578 if (dallowconnections)
1580 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1581 new_record_repl[Anum_pg_database_datallowconn - 1] = true;
1583 if (dconnlimit)
1585 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1586 new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1589 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1590 new_record_nulls, new_record_repl);
1591 CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
1593 InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
1595 systable_endscan(scan);
1597 /* Close pg_database, but keep lock till commit */
1598 table_close(rel, NoLock);
1600 return dboid;
1605 * ALTER DATABASE name SET ...
1608 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1610 Oid datid = get_database_oid(stmt->dbname, false);
1613 * Obtain a lock on the database and make sure it didn't go away in the
1614 * meantime.
1616 shdepLockAndCheckObject(DatabaseRelationId, datid);
1618 if (!pg_database_ownercheck(datid, GetUserId()))
1619 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1620 stmt->dbname);
1622 AlterSetting(datid, InvalidOid, stmt->setstmt);
1624 UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1626 return datid;
1631 * ALTER DATABASE name OWNER TO newowner
1633 ObjectAddress
1634 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1636 Oid db_id;
1637 HeapTuple tuple;
1638 Relation rel;
1639 ScanKeyData scankey;
1640 SysScanDesc scan;
1641 Form_pg_database datForm;
1642 ObjectAddress address;
1645 * Get the old tuple. We don't need a lock on the database per se,
1646 * because we're not going to do anything that would mess up incoming
1647 * connections.
1649 rel = table_open(DatabaseRelationId, RowExclusiveLock);
1650 ScanKeyInit(&scankey,
1651 Anum_pg_database_datname,
1652 BTEqualStrategyNumber, F_NAMEEQ,
1653 CStringGetDatum(dbname));
1654 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1655 NULL, 1, &scankey);
1656 tuple = systable_getnext(scan);
1657 if (!HeapTupleIsValid(tuple))
1658 ereport(ERROR,
1659 (errcode(ERRCODE_UNDEFINED_DATABASE),
1660 errmsg("database \"%s\" does not exist", dbname)));
1662 datForm = (Form_pg_database) GETSTRUCT(tuple);
1663 db_id = datForm->oid;
1666 * If the new owner is the same as the existing owner, consider the
1667 * command to have succeeded. This is to be consistent with other
1668 * objects.
1670 if (datForm->datdba != newOwnerId)
1672 Datum repl_val[Natts_pg_database];
1673 bool repl_null[Natts_pg_database];
1674 bool repl_repl[Natts_pg_database];
1675 Acl *newAcl;
1676 Datum aclDatum;
1677 bool isNull;
1678 HeapTuple newtuple;
1680 /* Otherwise, must be owner of the existing object */
1681 if (!pg_database_ownercheck(db_id, GetUserId()))
1682 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1683 dbname);
1685 /* Must be able to become new owner */
1686 check_is_member_of_role(GetUserId(), newOwnerId);
1689 * must have createdb rights
1691 * NOTE: This is different from other alter-owner checks in that the
1692 * current user is checked for createdb privileges instead of the
1693 * destination owner. This is consistent with the CREATE case for
1694 * databases. Because superusers will always have this right, we need
1695 * no special case for them.
1697 if (!have_createdb_privilege())
1698 ereport(ERROR,
1699 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1700 errmsg("permission denied to change owner of database")));
1702 memset(repl_null, false, sizeof(repl_null));
1703 memset(repl_repl, false, sizeof(repl_repl));
1705 repl_repl[Anum_pg_database_datdba - 1] = true;
1706 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1709 * Determine the modified ACL for the new owner. This is only
1710 * necessary when the ACL is non-null.
1712 aclDatum = heap_getattr(tuple,
1713 Anum_pg_database_datacl,
1714 RelationGetDescr(rel),
1715 &isNull);
1716 if (!isNull)
1718 newAcl = aclnewowner(DatumGetAclP(aclDatum),
1719 datForm->datdba, newOwnerId);
1720 repl_repl[Anum_pg_database_datacl - 1] = true;
1721 repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1724 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1725 CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1727 heap_freetuple(newtuple);
1729 /* Update owner dependency reference */
1730 changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
1733 InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1735 ObjectAddressSet(address, DatabaseRelationId, db_id);
1737 systable_endscan(scan);
1739 /* Close pg_database, but keep lock till commit */
1740 table_close(rel, NoLock);
1742 return address;
1747 * Helper functions
1751 * Look up info about the database named "name". If the database exists,
1752 * obtain the specified lock type on it, fill in any of the remaining
1753 * parameters that aren't NULL, and return true. If no such database,
1754 * return false.
1756 static bool
1757 get_db_info(const char *name, LOCKMODE lockmode,
1758 Oid *dbIdP, Oid *ownerIdP,
1759 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1760 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1761 MultiXactId *dbMinMultiP,
1762 Oid *dbTablespace, char **dbCollate, char **dbCtype)
1764 bool result = false;
1765 Relation relation;
1767 AssertArg(name);
1769 /* Caller may wish to grab a better lock on pg_database beforehand... */
1770 relation = table_open(DatabaseRelationId, AccessShareLock);
1773 * Loop covers the rare case where the database is renamed before we can
1774 * lock it. We try again just in case we can find a new one of the same
1775 * name.
1777 for (;;)
1779 ScanKeyData scanKey;
1780 SysScanDesc scan;
1781 HeapTuple tuple;
1782 Oid dbOid;
1785 * there's no syscache for database-indexed-by-name, so must do it the
1786 * hard way
1788 ScanKeyInit(&scanKey,
1789 Anum_pg_database_datname,
1790 BTEqualStrategyNumber, F_NAMEEQ,
1791 CStringGetDatum(name));
1793 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1794 NULL, 1, &scanKey);
1796 tuple = systable_getnext(scan);
1798 if (!HeapTupleIsValid(tuple))
1800 /* definitely no database of that name */
1801 systable_endscan(scan);
1802 break;
1805 dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
1807 systable_endscan(scan);
1810 * Now that we have a database OID, we can try to lock the DB.
1812 if (lockmode != NoLock)
1813 LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1816 * And now, re-fetch the tuple by OID. If it's still there and still
1817 * the same name, we win; else, drop the lock and loop back to try
1818 * again.
1820 tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1821 if (HeapTupleIsValid(tuple))
1823 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1825 if (strcmp(name, NameStr(dbform->datname)) == 0)
1827 /* oid of the database */
1828 if (dbIdP)
1829 *dbIdP = dbOid;
1830 /* oid of the owner */
1831 if (ownerIdP)
1832 *ownerIdP = dbform->datdba;
1833 /* character encoding */
1834 if (encodingP)
1835 *encodingP = dbform->encoding;
1836 /* allowed as template? */
1837 if (dbIsTemplateP)
1838 *dbIsTemplateP = dbform->datistemplate;
1839 /* allowing connections? */
1840 if (dbAllowConnP)
1841 *dbAllowConnP = dbform->datallowconn;
1842 /* last system OID used in database */
1843 if (dbLastSysOidP)
1844 *dbLastSysOidP = dbform->datlastsysoid;
1845 /* limit of frozen XIDs */
1846 if (dbFrozenXidP)
1847 *dbFrozenXidP = dbform->datfrozenxid;
1848 /* minimum MultiXactId */
1849 if (dbMinMultiP)
1850 *dbMinMultiP = dbform->datminmxid;
1851 /* default tablespace for this database */
1852 if (dbTablespace)
1853 *dbTablespace = dbform->dattablespace;
1854 /* default locale settings for this database */
1855 if (dbCollate)
1856 *dbCollate = pstrdup(NameStr(dbform->datcollate));
1857 if (dbCtype)
1858 *dbCtype = pstrdup(NameStr(dbform->datctype));
1859 ReleaseSysCache(tuple);
1860 result = true;
1861 break;
1863 /* can only get here if it was just renamed */
1864 ReleaseSysCache(tuple);
1867 if (lockmode != NoLock)
1868 UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1871 table_close(relation, AccessShareLock);
1873 return result;
1876 /* Check if current user has createdb privileges */
1877 static bool
1878 have_createdb_privilege(void)
1880 bool result = false;
1881 HeapTuple utup;
1883 /* Superusers can always do everything */
1884 if (superuser())
1885 return true;
1887 utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
1888 if (HeapTupleIsValid(utup))
1890 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1891 ReleaseSysCache(utup);
1893 return result;
1897 * Remove tablespace directories
1899 * We don't know what tablespaces db_id is using, so iterate through all
1900 * tablespaces removing <tablespace>/db_id
1902 static void
1903 remove_dbtablespaces(Oid db_id)
1905 Relation rel;
1906 TableScanDesc scan;
1907 HeapTuple tuple;
1908 List *ltblspc = NIL;
1909 ListCell *cell;
1910 int ntblspc;
1911 int i;
1912 Oid *tablespace_ids;
1914 rel = table_open(TableSpaceRelationId, AccessShareLock);
1915 scan = table_beginscan_catalog(rel, 0, NULL);
1916 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1918 Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
1919 Oid dsttablespace = spcform->oid;
1920 char *dstpath;
1921 struct stat st;
1923 /* Don't mess with the global tablespace */
1924 if (dsttablespace == GLOBALTABLESPACE_OID)
1925 continue;
1927 dstpath = GetDatabasePath(db_id, dsttablespace);
1929 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1931 /* Assume we can ignore it */
1932 pfree(dstpath);
1933 continue;
1936 if (!rmtree(dstpath, true))
1937 ereport(WARNING,
1938 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1939 dstpath)));
1941 ltblspc = lappend_oid(ltblspc, dsttablespace);
1942 pfree(dstpath);
1945 ntblspc = list_length(ltblspc);
1946 if (ntblspc == 0)
1948 table_endscan(scan);
1949 table_close(rel, AccessShareLock);
1950 return;
1953 tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
1954 i = 0;
1955 foreach(cell, ltblspc)
1956 tablespace_ids[i++] = lfirst_oid(cell);
1958 /* Record the filesystem change in XLOG */
1960 xl_dbase_drop_rec xlrec;
1962 xlrec.db_id = db_id;
1963 xlrec.ntablespaces = ntblspc;
1965 XLogBeginInsert();
1966 XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
1967 XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
1969 (void) XLogInsert(RM_DBASE_ID,
1970 XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1973 list_free(ltblspc);
1974 pfree(tablespace_ids);
1976 table_endscan(scan);
1977 table_close(rel, AccessShareLock);
1981 * Check for existing files that conflict with a proposed new DB OID;
1982 * return true if there are any
1984 * If there were a subdirectory in any tablespace matching the proposed new
1985 * OID, we'd get a create failure due to the duplicate name ... and then we'd
1986 * try to remove that already-existing subdirectory during the cleanup in
1987 * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
1988 * instead we make this extra check before settling on the OID of the new
1989 * database. This exactly parallels what GetNewRelFileNode() does for table
1990 * relfilenode values.
1992 static bool
1993 check_db_file_conflict(Oid db_id)
1995 bool result = false;
1996 Relation rel;
1997 TableScanDesc scan;
1998 HeapTuple tuple;
2000 rel = table_open(TableSpaceRelationId, AccessShareLock);
2001 scan = table_beginscan_catalog(rel, 0, NULL);
2002 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2004 Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
2005 Oid dsttablespace = spcform->oid;
2006 char *dstpath;
2007 struct stat st;
2009 /* Don't mess with the global tablespace */
2010 if (dsttablespace == GLOBALTABLESPACE_OID)
2011 continue;
2013 dstpath = GetDatabasePath(db_id, dsttablespace);
2015 if (lstat(dstpath, &st) == 0)
2017 /* Found a conflicting file (or directory, whatever) */
2018 pfree(dstpath);
2019 result = true;
2020 break;
2023 pfree(dstpath);
2026 table_endscan(scan);
2027 table_close(rel, AccessShareLock);
2029 return result;
2033 * Issue a suitable errdetail message for a busy database
2035 static int
2036 errdetail_busy_db(int notherbackends, int npreparedxacts)
2038 if (notherbackends > 0 && npreparedxacts > 0)
2041 * We don't deal with singular versus plural here, since gettext
2042 * doesn't support multiple plurals in one string.
2044 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
2045 notherbackends, npreparedxacts);
2046 else if (notherbackends > 0)
2047 errdetail_plural("There is %d other session using the database.",
2048 "There are %d other sessions using the database.",
2049 notherbackends,
2050 notherbackends);
2051 else
2052 errdetail_plural("There is %d prepared transaction using the database.",
2053 "There are %d prepared transactions using the database.",
2054 npreparedxacts,
2055 npreparedxacts);
2056 return 0; /* just to keep ereport macro happy */
2060 * get_database_oid - given a database name, look up the OID
2062 * If missing_ok is false, throw an error if database name not found. If
2063 * true, just return InvalidOid.
2066 get_database_oid(const char *dbname, bool missing_ok)
2068 Relation pg_database;
2069 ScanKeyData entry[1];
2070 SysScanDesc scan;
2071 HeapTuple dbtuple;
2072 Oid oid;
2075 * There's no syscache for pg_database indexed by name, so we must look
2076 * the hard way.
2078 pg_database = table_open(DatabaseRelationId, AccessShareLock);
2079 ScanKeyInit(&entry[0],
2080 Anum_pg_database_datname,
2081 BTEqualStrategyNumber, F_NAMEEQ,
2082 CStringGetDatum(dbname));
2083 scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
2084 NULL, 1, entry);
2086 dbtuple = systable_getnext(scan);
2088 /* We assume that there can be at most one matching tuple */
2089 if (HeapTupleIsValid(dbtuple))
2090 oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
2091 else
2092 oid = InvalidOid;
2094 systable_endscan(scan);
2095 table_close(pg_database, AccessShareLock);
2097 if (!OidIsValid(oid) && !missing_ok)
2098 ereport(ERROR,
2099 (errcode(ERRCODE_UNDEFINED_DATABASE),
2100 errmsg("database \"%s\" does not exist",
2101 dbname)));
2103 return oid;
2108 * get_database_name - given a database OID, look up the name
2110 * Returns a palloc'd string, or NULL if no such database.
2112 char *
2113 get_database_name(Oid dbid)
2115 HeapTuple dbtuple;
2116 char *result;
2118 dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2119 if (HeapTupleIsValid(dbtuple))
2121 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
2122 ReleaseSysCache(dbtuple);
2124 else
2125 result = NULL;
2127 return result;
2131 * DATABASE resource manager's routines
2133 void
2134 dbase_redo(XLogReaderState *record)
2136 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
2138 /* Backup blocks are not used in dbase records */
2139 Assert(!XLogRecHasAnyBlockRefs(record));
2141 if (info == XLOG_DBASE_CREATE)
2143 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
2144 char *src_path;
2145 char *dst_path;
2146 struct stat st;
2148 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
2149 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2152 * Our theory for replaying a CREATE is to forcibly drop the target
2153 * subdirectory if present, then re-copy the source data. This may be
2154 * more work than needed, but it is simple to implement.
2156 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
2158 if (!rmtree(dst_path, true))
2159 /* If this failed, copydir() below is going to error. */
2160 ereport(WARNING,
2161 (errmsg("some useless files may be left behind in old database directory \"%s\"",
2162 dst_path)));
2166 * Force dirty buffers out to disk, to ensure source database is
2167 * up-to-date for the copy.
2169 FlushDatabaseBuffers(xlrec->src_db_id);
2172 * Copy this subdirectory to the new location
2174 * We don't need to copy subdirectories
2176 copydir(src_path, dst_path, false);
2178 else if (info == XLOG_DBASE_DROP)
2180 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
2181 char *dst_path;
2182 int i;
2184 if (InHotStandby)
2187 * Lock database while we resolve conflicts to ensure that
2188 * InitPostgres() cannot fully re-execute concurrently. This
2189 * avoids backends re-connecting automatically to same database,
2190 * which can happen in some cases.
2192 * This will lock out walsenders trying to connect to db-specific
2193 * slots for logical decoding too, so it's safe for us to drop
2194 * slots.
2196 LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2197 ResolveRecoveryConflictWithDatabase(xlrec->db_id);
2200 /* Drop any database-specific replication slots */
2201 ReplicationSlotsDropDBSlots(xlrec->db_id);
2203 /* Drop pages for this database that are in the shared buffer cache */
2204 DropDatabaseBuffers(xlrec->db_id);
2206 /* Also, clean out any fsync requests that might be pending in md.c */
2207 ForgetDatabaseSyncRequests(xlrec->db_id);
2209 /* Clean out the xlog relcache too */
2210 XLogDropDatabase(xlrec->db_id);
2212 for (i = 0; i < xlrec->ntablespaces; i++)
2214 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
2216 /* And remove the physical files */
2217 if (!rmtree(dst_path, true))
2218 ereport(WARNING,
2219 (errmsg("some useless files may be left behind in old database directory \"%s\"",
2220 dst_path)));
2221 pfree(dst_path);
2224 if (InHotStandby)
2227 * Release locks prior to commit. XXX There is a race condition
2228 * here that may allow backends to reconnect, but the window for
2229 * this is small because the gap between here and commit is mostly
2230 * fairly small and it is unlikely that people will be dropping
2231 * databases that we are trying to connect to anyway.
2233 UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2236 else
2237 elog(PANIC, "dbase_redo: unknown op code %u", info);