Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * dbcommands.c
4 : * Database management commands (create/drop database).
5 : *
6 : * Note: database creation/destruction commands use exclusive locks on
7 : * the database objects (as expressed by LockSharedObject()) to avoid
8 : * stepping on each others' toes. Formerly we used table-level locks
9 : * on pg_database, but that's too coarse-grained.
10 : *
11 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/commands/dbcommands.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <fcntl.h>
23 : #include <unistd.h>
24 : #include <sys/stat.h>
25 :
26 : #include "access/genam.h"
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/xact.h"
30 : #include "access/xloginsert.h"
31 : #include "access/xlogutils.h"
32 : #include "catalog/catalog.h"
33 : #include "catalog/dependency.h"
34 : #include "catalog/indexing.h"
35 : #include "catalog/objectaccess.h"
36 : #include "catalog/pg_authid.h"
37 : #include "catalog/pg_database.h"
38 : #include "catalog/pg_db_role_setting.h"
39 : #include "catalog/pg_subscription.h"
40 : #include "catalog/pg_tablespace.h"
41 : #include "commands/comment.h"
42 : #include "commands/dbcommands.h"
43 : #include "commands/dbcommands_xlog.h"
44 : #include "commands/defrem.h"
45 : #include "commands/seclabel.h"
46 : #include "commands/tablespace.h"
47 : #include "mb/pg_wchar.h"
48 : #include "miscadmin.h"
49 : #include "pgstat.h"
50 : #include "postmaster/bgwriter.h"
51 : #include "replication/slot.h"
52 : #include "storage/copydir.h"
53 : #include "storage/fd.h"
54 : #include "storage/lmgr.h"
55 : #include "storage/ipc.h"
56 : #include "storage/procarray.h"
57 : #include "storage/smgr.h"
58 : #include "utils/acl.h"
59 : #include "utils/builtins.h"
60 : #include "utils/fmgroids.h"
61 : #include "utils/pg_locale.h"
62 : #include "utils/snapmgr.h"
63 : #include "utils/syscache.h"
64 : #include "utils/tqual.h"
65 :
66 :
67 : typedef struct
68 : {
69 : Oid src_dboid; /* source (template) DB */
70 : Oid dest_dboid; /* DB we are trying to create */
71 : } createdb_failure_params;
72 :
73 : typedef struct
74 : {
75 : Oid dest_dboid; /* DB we are trying to move */
76 : Oid dest_tsoid; /* tablespace we are trying to move to */
77 : } movedb_failure_params;
78 :
79 : /* non-export function prototypes */
80 : static void createdb_failure_callback(int code, Datum arg);
81 : static void movedb(const char *dbname, const char *tblspcname);
82 : static void movedb_failure_callback(int code, Datum arg);
83 : static bool get_db_info(const char *name, LOCKMODE lockmode,
84 : Oid *dbIdP, Oid *ownerIdP,
85 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
86 : Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
87 : MultiXactId *dbMinMultiP,
88 : Oid *dbTablespace, char **dbCollate, char **dbCtype);
89 : static bool have_createdb_privilege(void);
90 : static void remove_dbtablespaces(Oid db_id);
91 : static bool check_db_file_conflict(Oid db_id);
92 : static int errdetail_busy_db(int notherbackends, int npreparedxacts);
93 :
94 :
95 : /*
96 : * CREATE DATABASE
97 : */
98 : Oid
99 3 : createdb(ParseState *pstate, const CreatedbStmt *stmt)
100 : {
101 : HeapScanDesc scan;
102 : Relation rel;
103 : Oid src_dboid;
104 : Oid src_owner;
105 : int src_encoding;
106 : char *src_collate;
107 : char *src_ctype;
108 : bool src_istemplate;
109 : bool src_allowconn;
110 : Oid src_lastsysoid;
111 : TransactionId src_frozenxid;
112 : MultiXactId src_minmxid;
113 : Oid src_deftablespace;
114 : volatile Oid dst_deftablespace;
115 : Relation pg_database_rel;
116 : HeapTuple tuple;
117 : Datum new_record[Natts_pg_database];
118 : bool new_record_nulls[Natts_pg_database];
119 : Oid dboid;
120 : Oid datdba;
121 : ListCell *option;
122 3 : DefElem *dtablespacename = NULL;
123 3 : DefElem *downer = NULL;
124 3 : DefElem *dtemplate = NULL;
125 3 : DefElem *dencoding = NULL;
126 3 : DefElem *dcollate = NULL;
127 3 : DefElem *dctype = NULL;
128 3 : DefElem *distemplate = NULL;
129 3 : DefElem *dallowconnections = NULL;
130 3 : DefElem *dconnlimit = NULL;
131 3 : char *dbname = stmt->dbname;
132 3 : char *dbowner = NULL;
133 3 : const char *dbtemplate = NULL;
134 3 : char *dbcollate = NULL;
135 3 : char *dbctype = NULL;
136 : char *canonname;
137 3 : int encoding = -1;
138 3 : bool dbistemplate = false;
139 3 : bool dballowconnections = true;
140 3 : int dbconnlimit = -1;
141 : int notherbackends;
142 : int npreparedxacts;
143 : createdb_failure_params fparms;
144 :
145 : /* Extract options from the statement node tree */
146 6 : foreach(option, stmt->options)
147 : {
148 3 : DefElem *defel = (DefElem *) lfirst(option);
149 :
150 3 : if (strcmp(defel->defname, "tablespace") == 0)
151 : {
152 0 : if (dtablespacename)
153 0 : ereport(ERROR,
154 : (errcode(ERRCODE_SYNTAX_ERROR),
155 : errmsg("conflicting or redundant options"),
156 : parser_errposition(pstate, defel->location)));
157 0 : dtablespacename = defel;
158 : }
159 3 : else if (strcmp(defel->defname, "owner") == 0)
160 : {
161 0 : if (downer)
162 0 : ereport(ERROR,
163 : (errcode(ERRCODE_SYNTAX_ERROR),
164 : errmsg("conflicting or redundant options"),
165 : parser_errposition(pstate, defel->location)));
166 0 : downer = defel;
167 : }
168 3 : else if (strcmp(defel->defname, "template") == 0)
169 : {
170 1 : if (dtemplate)
171 0 : ereport(ERROR,
172 : (errcode(ERRCODE_SYNTAX_ERROR),
173 : errmsg("conflicting or redundant options"),
174 : parser_errposition(pstate, defel->location)));
175 1 : dtemplate = defel;
176 : }
177 2 : else if (strcmp(defel->defname, "encoding") == 0)
178 : {
179 0 : if (dencoding)
180 0 : ereport(ERROR,
181 : (errcode(ERRCODE_SYNTAX_ERROR),
182 : errmsg("conflicting or redundant options"),
183 : parser_errposition(pstate, defel->location)));
184 0 : dencoding = defel;
185 : }
186 2 : else if (strcmp(defel->defname, "lc_collate") == 0)
187 : {
188 0 : if (dcollate)
189 0 : ereport(ERROR,
190 : (errcode(ERRCODE_SYNTAX_ERROR),
191 : errmsg("conflicting or redundant options"),
192 : parser_errposition(pstate, defel->location)));
193 0 : dcollate = defel;
194 : }
195 2 : else if (strcmp(defel->defname, "lc_ctype") == 0)
196 : {
197 0 : if (dctype)
198 0 : ereport(ERROR,
199 : (errcode(ERRCODE_SYNTAX_ERROR),
200 : errmsg("conflicting or redundant options"),
201 : parser_errposition(pstate, defel->location)));
202 0 : dctype = defel;
203 : }
204 2 : else if (strcmp(defel->defname, "is_template") == 0)
205 : {
206 1 : if (distemplate)
207 0 : ereport(ERROR,
208 : (errcode(ERRCODE_SYNTAX_ERROR),
209 : errmsg("conflicting or redundant options"),
210 : parser_errposition(pstate, defel->location)));
211 1 : distemplate = defel;
212 : }
213 1 : else if (strcmp(defel->defname, "allow_connections") == 0)
214 : {
215 1 : if (dallowconnections)
216 0 : ereport(ERROR,
217 : (errcode(ERRCODE_SYNTAX_ERROR),
218 : errmsg("conflicting or redundant options"),
219 : parser_errposition(pstate, defel->location)));
220 1 : dallowconnections = defel;
221 : }
222 0 : else if (strcmp(defel->defname, "connection_limit") == 0)
223 : {
224 0 : if (dconnlimit)
225 0 : ereport(ERROR,
226 : (errcode(ERRCODE_SYNTAX_ERROR),
227 : errmsg("conflicting or redundant options"),
228 : parser_errposition(pstate, defel->location)));
229 0 : dconnlimit = defel;
230 : }
231 0 : else if (strcmp(defel->defname, "location") == 0)
232 : {
233 0 : ereport(WARNING,
234 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
235 : errmsg("LOCATION is not supported anymore"),
236 : errhint("Consider using tablespaces instead."),
237 : parser_errposition(pstate, defel->location)));
238 : }
239 : else
240 0 : ereport(ERROR,
241 : (errcode(ERRCODE_SYNTAX_ERROR),
242 : errmsg("option \"%s\" not recognized", defel->defname),
243 : parser_errposition(pstate, defel->location)));
244 : }
245 :
246 3 : if (downer && downer->arg)
247 0 : dbowner = defGetString(downer);
248 3 : if (dtemplate && dtemplate->arg)
249 1 : dbtemplate = defGetString(dtemplate);
250 3 : if (dencoding && dencoding->arg)
251 : {
252 : const char *encoding_name;
253 :
254 0 : if (IsA(dencoding->arg, Integer))
255 : {
256 0 : encoding = defGetInt32(dencoding);
257 0 : encoding_name = pg_encoding_to_char(encoding);
258 0 : if (strcmp(encoding_name, "") == 0 ||
259 0 : pg_valid_server_encoding(encoding_name) < 0)
260 0 : ereport(ERROR,
261 : (errcode(ERRCODE_UNDEFINED_OBJECT),
262 : errmsg("%d is not a valid encoding code",
263 : encoding),
264 : parser_errposition(pstate, dencoding->location)));
265 : }
266 : else
267 : {
268 0 : encoding_name = defGetString(dencoding);
269 0 : encoding = pg_valid_server_encoding(encoding_name);
270 0 : if (encoding < 0)
271 0 : ereport(ERROR,
272 : (errcode(ERRCODE_UNDEFINED_OBJECT),
273 : errmsg("%s is not a valid encoding name",
274 : encoding_name),
275 : parser_errposition(pstate, dencoding->location)));
276 : }
277 : }
278 3 : if (dcollate && dcollate->arg)
279 0 : dbcollate = defGetString(dcollate);
280 3 : if (dctype && dctype->arg)
281 0 : dbctype = defGetString(dctype);
282 3 : if (distemplate && distemplate->arg)
283 1 : dbistemplate = defGetBoolean(distemplate);
284 3 : if (dallowconnections && dallowconnections->arg)
285 1 : dballowconnections = defGetBoolean(dallowconnections);
286 3 : if (dconnlimit && dconnlimit->arg)
287 : {
288 0 : dbconnlimit = defGetInt32(dconnlimit);
289 0 : if (dbconnlimit < -1)
290 0 : ereport(ERROR,
291 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
292 : errmsg("invalid connection limit: %d", dbconnlimit)));
293 : }
294 :
295 : /* obtain OID of proposed owner */
296 3 : if (dbowner)
297 0 : datdba = get_role_oid(dbowner, false);
298 : else
299 3 : datdba = GetUserId();
300 :
301 : /*
302 : * To create a database, must have createdb privilege and must be able to
303 : * become the target role (this does not imply that the target role itself
304 : * must have createdb privilege). The latter provision guards against
305 : * "giveaway" attacks. Note that a superuser will always have both of
306 : * these privileges a fortiori.
307 : */
308 3 : if (!have_createdb_privilege())
309 0 : ereport(ERROR,
310 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
311 : errmsg("permission denied to create database")));
312 :
313 3 : check_is_member_of_role(GetUserId(), datdba);
314 :
315 : /*
316 : * Lookup database (template) to be cloned, and obtain share lock on it.
317 : * ShareLock allows two CREATE DATABASEs to work from the same template
318 : * concurrently, while ensuring no one is busy dropping it in parallel
319 : * (which would be Very Bad since we'd likely get an incomplete copy
320 : * without knowing it). This also prevents any new connections from being
321 : * made to the source until we finish copying it, so we can be sure it
322 : * won't change underneath us.
323 : */
324 3 : if (!dbtemplate)
325 2 : dbtemplate = "template1"; /* Default template database name */
326 :
327 3 : if (!get_db_info(dbtemplate, ShareLock,
328 : &src_dboid, &src_owner, &src_encoding,
329 : &src_istemplate, &src_allowconn, &src_lastsysoid,
330 : &src_frozenxid, &src_minmxid, &src_deftablespace,
331 : &src_collate, &src_ctype))
332 0 : ereport(ERROR,
333 : (errcode(ERRCODE_UNDEFINED_DATABASE),
334 : errmsg("template database \"%s\" does not exist",
335 : dbtemplate)));
336 :
337 : /*
338 : * Permission check: to copy a DB that's not marked datistemplate, you
339 : * must be superuser or the owner thereof.
340 : */
341 3 : if (!src_istemplate)
342 : {
343 0 : if (!pg_database_ownercheck(src_dboid, GetUserId()))
344 0 : ereport(ERROR,
345 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
346 : errmsg("permission denied to copy database \"%s\"",
347 : dbtemplate)));
348 : }
349 :
350 : /* If encoding or locales are defaulted, use source's setting */
351 3 : if (encoding < 0)
352 3 : encoding = src_encoding;
353 3 : if (dbcollate == NULL)
354 3 : dbcollate = src_collate;
355 3 : if (dbctype == NULL)
356 3 : dbctype = src_ctype;
357 :
358 : /* Some encodings are client only */
359 3 : if (!PG_VALID_BE_ENCODING(encoding))
360 0 : ereport(ERROR,
361 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
362 : errmsg("invalid server encoding %d", encoding)));
363 :
364 : /* Check that the chosen locales are valid, and get canonical spellings */
365 3 : if (!check_locale(LC_COLLATE, dbcollate, &canonname))
366 0 : ereport(ERROR,
367 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
368 : errmsg("invalid locale name: \"%s\"", dbcollate)));
369 3 : dbcollate = canonname;
370 3 : if (!check_locale(LC_CTYPE, dbctype, &canonname))
371 0 : ereport(ERROR,
372 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
373 : errmsg("invalid locale name: \"%s\"", dbctype)));
374 3 : dbctype = canonname;
375 :
376 3 : check_encoding_locale_matches(encoding, dbcollate, dbctype);
377 :
378 : /*
379 : * Check that the new encoding and locale settings match the source
380 : * database. We insist on this because we simply copy the source data ---
381 : * any non-ASCII data would be wrongly encoded, and any indexes sorted
382 : * according to the source locale would be wrong.
383 : *
384 : * However, we assume that template0 doesn't contain any non-ASCII data
385 : * nor any indexes that depend on collation or ctype, so template0 can be
386 : * used as template for creating a database with any encoding or locale.
387 : */
388 3 : if (strcmp(dbtemplate, "template0") != 0)
389 : {
390 2 : if (encoding != src_encoding)
391 0 : ereport(ERROR,
392 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
393 : errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
394 : pg_encoding_to_char(encoding),
395 : pg_encoding_to_char(src_encoding)),
396 : errhint("Use the same encoding as in the template database, or use template0 as template.")));
397 :
398 2 : if (strcmp(dbcollate, src_collate) != 0)
399 0 : ereport(ERROR,
400 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
401 : errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
402 : dbcollate, src_collate),
403 : errhint("Use the same collation as in the template database, or use template0 as template.")));
404 :
405 2 : if (strcmp(dbctype, src_ctype) != 0)
406 0 : ereport(ERROR,
407 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
408 : errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
409 : dbctype, src_ctype),
410 : errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
411 : }
412 :
413 : /* Resolve default tablespace for new database */
414 3 : if (dtablespacename && dtablespacename->arg)
415 0 : {
416 : char *tablespacename;
417 : AclResult aclresult;
418 :
419 0 : tablespacename = defGetString(dtablespacename);
420 0 : dst_deftablespace = get_tablespace_oid(tablespacename, false);
421 : /* check permissions */
422 0 : aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
423 : ACL_CREATE);
424 0 : if (aclresult != ACLCHECK_OK)
425 0 : aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
426 : tablespacename);
427 :
428 : /* pg_global must never be the default tablespace */
429 0 : if (dst_deftablespace == GLOBALTABLESPACE_OID)
430 0 : ereport(ERROR,
431 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
432 : errmsg("pg_global cannot be used as default tablespace")));
433 :
434 : /*
435 : * If we are trying to change the default tablespace of the template,
436 : * we require that the template not have any files in the new default
437 : * tablespace. This is necessary because otherwise the copied
438 : * database would contain pg_class rows that refer to its default
439 : * tablespace both explicitly (by OID) and implicitly (as zero), which
440 : * would cause problems. For example another CREATE DATABASE using
441 : * the copied database as template, and trying to change its default
442 : * tablespace again, would yield outright incorrect results (it would
443 : * improperly move tables to the new default tablespace that should
444 : * stay in the same tablespace).
445 : */
446 0 : if (dst_deftablespace != src_deftablespace)
447 : {
448 : char *srcpath;
449 : struct stat st;
450 :
451 0 : srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
452 :
453 0 : if (stat(srcpath, &st) == 0 &&
454 0 : S_ISDIR(st.st_mode) &&
455 0 : !directory_is_empty(srcpath))
456 0 : ereport(ERROR,
457 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458 : errmsg("cannot assign new default tablespace \"%s\"",
459 : tablespacename),
460 : errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
461 : dbtemplate)));
462 0 : pfree(srcpath);
463 : }
464 : }
465 : else
466 : {
467 : /* Use template database's default tablespace */
468 3 : dst_deftablespace = src_deftablespace;
469 : /* Note there is no additional permission check in this path */
470 : }
471 :
472 : /*
473 : * Check for db name conflict. This is just to give a more friendly error
474 : * message than "unique index violation". There's a race condition but
475 : * we're willing to accept the less friendly message in that case.
476 : */
477 3 : if (OidIsValid(get_database_oid(dbname, true)))
478 0 : ereport(ERROR,
479 : (errcode(ERRCODE_DUPLICATE_DATABASE),
480 : errmsg("database \"%s\" already exists", dbname)));
481 :
482 : /*
483 : * The source DB can't have any active backends, except this one
484 : * (exception is to allow CREATE DB while connected to template1).
485 : * Otherwise we might copy inconsistent data.
486 : *
487 : * This should be last among the basic error checks, because it involves
488 : * potential waiting; we may as well throw an error first if we're gonna
489 : * throw one.
490 : */
491 3 : if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
492 0 : ereport(ERROR,
493 : (errcode(ERRCODE_OBJECT_IN_USE),
494 : errmsg("source database \"%s\" is being accessed by other users",
495 : dbtemplate),
496 : errdetail_busy_db(notherbackends, npreparedxacts)));
497 :
498 : /*
499 : * Select an OID for the new database, checking that it doesn't have a
500 : * filename conflict with anything already existing in the tablespace
501 : * directories.
502 : */
503 3 : pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
504 :
505 : do
506 : {
507 3 : dboid = GetNewOid(pg_database_rel);
508 3 : } while (check_db_file_conflict(dboid));
509 :
510 : /*
511 : * Insert a new tuple into pg_database. This establishes our ownership of
512 : * the new database name (anyone else trying to insert the same name will
513 : * block on the unique index, and fail after we commit).
514 : */
515 :
516 : /* Form tuple */
517 3 : MemSet(new_record, 0, sizeof(new_record));
518 3 : MemSet(new_record_nulls, false, sizeof(new_record_nulls));
519 :
520 3 : new_record[Anum_pg_database_datname - 1] =
521 3 : DirectFunctionCall1(namein, CStringGetDatum(dbname));
522 3 : new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
523 3 : new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
524 3 : new_record[Anum_pg_database_datcollate - 1] =
525 3 : DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
526 3 : new_record[Anum_pg_database_datctype - 1] =
527 3 : DirectFunctionCall1(namein, CStringGetDatum(dbctype));
528 3 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
529 3 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
530 3 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
531 3 : new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
532 3 : new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
533 3 : new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
534 3 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
535 :
536 : /*
537 : * We deliberately set datacl to default (NULL), rather than copying it
538 : * from the template database. Copying it would be a bad idea when the
539 : * owner is not the same as the template's owner.
540 : */
541 3 : new_record_nulls[Anum_pg_database_datacl - 1] = true;
542 :
543 3 : tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
544 : new_record, new_record_nulls);
545 :
546 3 : HeapTupleSetOid(tuple, dboid);
547 :
548 3 : CatalogTupleInsert(pg_database_rel, tuple);
549 :
550 : /*
551 : * Now generate additional catalog entries associated with the new DB
552 : */
553 :
554 : /* Register owner dependency */
555 3 : recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
556 :
557 : /* Create pg_shdepend entries for objects within database */
558 3 : copyTemplateDependencies(src_dboid, dboid);
559 :
560 : /* Post creation hook for new database */
561 3 : InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
562 :
563 : /*
564 : * Force a checkpoint before starting the copy. This will force all dirty
565 : * buffers, including those of unlogged tables, out to disk, to ensure
566 : * source database is up-to-date on disk for the copy.
567 : * FlushDatabaseBuffers() would suffice for that, but we also want to
568 : * process any pending unlink requests. Otherwise, if a checkpoint
569 : * happened while we're copying files, a file might be deleted just when
570 : * we're about to copy it, causing the lstat() call in copydir() to fail
571 : * with ENOENT.
572 : */
573 3 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
574 : | CHECKPOINT_FLUSH_ALL);
575 :
576 : /*
577 : * Once we start copying subdirectories, we need to be able to clean 'em
578 : * up if we fail. Use an ENSURE block to make sure this happens. (This
579 : * is not a 100% solution, because of the possibility of failure during
580 : * transaction commit after we leave this routine, but it should handle
581 : * most scenarios.)
582 : */
583 3 : fparms.src_dboid = src_dboid;
584 3 : fparms.dest_dboid = dboid;
585 3 : PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
586 : PointerGetDatum(&fparms));
587 : {
588 : /*
589 : * Iterate through all tablespaces of the template database, and copy
590 : * each one to the new database.
591 : */
592 3 : rel = heap_open(TableSpaceRelationId, AccessShareLock);
593 3 : scan = heap_beginscan_catalog(rel, 0, NULL);
594 12 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
595 : {
596 6 : Oid srctablespace = HeapTupleGetOid(tuple);
597 : Oid dsttablespace;
598 : char *srcpath;
599 : char *dstpath;
600 : struct stat st;
601 :
602 : /* No need to copy global tablespace */
603 6 : if (srctablespace == GLOBALTABLESPACE_OID)
604 6 : continue;
605 :
606 3 : srcpath = GetDatabasePath(src_dboid, srctablespace);
607 :
608 6 : if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
609 3 : directory_is_empty(srcpath))
610 : {
611 : /* Assume we can ignore it */
612 0 : pfree(srcpath);
613 0 : continue;
614 : }
615 :
616 3 : if (srctablespace == src_deftablespace)
617 3 : dsttablespace = dst_deftablespace;
618 : else
619 0 : dsttablespace = srctablespace;
620 :
621 3 : dstpath = GetDatabasePath(dboid, dsttablespace);
622 :
623 : /*
624 : * Copy this subdirectory to the new location
625 : *
626 : * We don't need to copy subdirectories
627 : */
628 3 : copydir(srcpath, dstpath, false);
629 :
630 : /* Record the filesystem change in XLOG */
631 : {
632 : xl_dbase_create_rec xlrec;
633 :
634 3 : xlrec.db_id = dboid;
635 3 : xlrec.tablespace_id = dsttablespace;
636 3 : xlrec.src_db_id = src_dboid;
637 3 : xlrec.src_tablespace_id = srctablespace;
638 :
639 3 : XLogBeginInsert();
640 3 : XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
641 :
642 3 : (void) XLogInsert(RM_DBASE_ID,
643 : XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
644 : }
645 : }
646 3 : heap_endscan(scan);
647 3 : heap_close(rel, AccessShareLock);
648 :
649 : /*
650 : * We force a checkpoint before committing. This effectively means
651 : * that committed XLOG_DBASE_CREATE operations will never need to be
652 : * replayed (at least not in ordinary crash recovery; we still have to
653 : * make the XLOG entry for the benefit of PITR operations). This
654 : * avoids two nasty scenarios:
655 : *
656 : * #1: When PITR is off, we don't XLOG the contents of newly created
657 : * indexes; therefore the drop-and-recreate-whole-directory behavior
658 : * of DBASE_CREATE replay would lose such indexes.
659 : *
660 : * #2: Since we have to recopy the source database during DBASE_CREATE
661 : * replay, we run the risk of copying changes in it that were
662 : * committed after the original CREATE DATABASE command but before the
663 : * system crash that led to the replay. This is at least unexpected
664 : * and at worst could lead to inconsistencies, eg duplicate table
665 : * names.
666 : *
667 : * (Both of these were real bugs in releases 8.0 through 8.0.3.)
668 : *
669 : * In PITR replay, the first of these isn't an issue, and the second
670 : * is only a risk if the CREATE DATABASE and subsequent template
671 : * database change both occur while a base backup is being taken.
672 : * There doesn't seem to be much we can do about that except document
673 : * it as a limitation.
674 : *
675 : * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
676 : * we can avoid this.
677 : */
678 3 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
679 :
680 : /*
681 : * Close pg_database, but keep lock till commit.
682 : */
683 3 : heap_close(pg_database_rel, NoLock);
684 :
685 : /*
686 : * Force synchronous commit, thus minimizing the window between
687 : * creation of the database files and committal of the transaction. If
688 : * we crash before committing, we'll have a DB that's taking up disk
689 : * space but is not in pg_database, which is not good.
690 : */
691 3 : ForceSyncCommit();
692 : }
693 3 : PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
694 : PointerGetDatum(&fparms));
695 :
696 3 : return dboid;
697 : }
698 :
699 : /*
700 : * Check whether chosen encoding matches chosen locale settings. This
701 : * restriction is necessary because libc's locale-specific code usually
702 : * fails when presented with data in an encoding it's not expecting. We
703 : * allow mismatch in four cases:
704 : *
705 : * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
706 : * which works with any encoding.
707 : *
708 : * 2. locale encoding = -1, which means that we couldn't determine the
709 : * locale's encoding and have to trust the user to get it right.
710 : *
711 : * 3. selected encoding is UTF8 and platform is win32. This is because
712 : * UTF8 is a pseudo codepage that is supported in all locales since it's
713 : * converted to UTF16 before being used.
714 : *
715 : * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
716 : * is risky but we have historically allowed it --- notably, the
717 : * regression tests require it.
718 : *
719 : * Note: if you change this policy, fix initdb to match.
720 : */
721 : void
722 4 : check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
723 : {
724 4 : int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
725 4 : int collate_encoding = pg_get_encoding_from_locale(collate, true);
726 :
727 4 : if (!(ctype_encoding == encoding ||
728 0 : ctype_encoding == PG_SQL_ASCII ||
729 : ctype_encoding == -1 ||
730 : #ifdef WIN32
731 : encoding == PG_UTF8 ||
732 : #endif
733 0 : (encoding == PG_SQL_ASCII && superuser())))
734 0 : ereport(ERROR,
735 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
736 : errmsg("encoding \"%s\" does not match locale \"%s\"",
737 : pg_encoding_to_char(encoding),
738 : ctype),
739 : errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
740 : pg_encoding_to_char(ctype_encoding))));
741 :
742 4 : if (!(collate_encoding == encoding ||
743 0 : collate_encoding == PG_SQL_ASCII ||
744 : collate_encoding == -1 ||
745 : #ifdef WIN32
746 : encoding == PG_UTF8 ||
747 : #endif
748 0 : (encoding == PG_SQL_ASCII && superuser())))
749 0 : ereport(ERROR,
750 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
751 : errmsg("encoding \"%s\" does not match locale \"%s\"",
752 : pg_encoding_to_char(encoding),
753 : collate),
754 : errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
755 : pg_encoding_to_char(collate_encoding))));
756 4 : }
757 :
758 : /* Error cleanup callback for createdb */
759 : static void
760 0 : createdb_failure_callback(int code, Datum arg)
761 : {
762 0 : createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
763 :
764 : /*
765 : * Release lock on source database before doing recursive remove. This is
766 : * not essential but it seems desirable to release the lock as soon as
767 : * possible.
768 : */
769 0 : UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
770 :
771 : /* Throw away any successfully copied subdirectories */
772 0 : remove_dbtablespaces(fparms->dest_dboid);
773 0 : }
774 :
775 :
776 : /*
777 : * DROP DATABASE
778 : */
779 : void
780 0 : dropdb(const char *dbname, bool missing_ok)
781 : {
782 : Oid db_id;
783 : bool db_istemplate;
784 : Relation pgdbrel;
785 : HeapTuple tup;
786 : int notherbackends;
787 : int npreparedxacts;
788 : int nslots,
789 : nslots_active;
790 : int nsubscriptions;
791 :
792 : /*
793 : * Look up the target database's OID, and get exclusive lock on it. We
794 : * need this to ensure that no new backend starts up in the target
795 : * database while we are deleting it (see postinit.c), and that no one is
796 : * using it as a CREATE DATABASE template or trying to delete it for
797 : * themselves.
798 : */
799 0 : pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
800 :
801 0 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
802 : &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
803 : {
804 0 : if (!missing_ok)
805 : {
806 0 : ereport(ERROR,
807 : (errcode(ERRCODE_UNDEFINED_DATABASE),
808 : errmsg("database \"%s\" does not exist", dbname)));
809 : }
810 : else
811 : {
812 : /* Close pg_database, release the lock, since we changed nothing */
813 0 : heap_close(pgdbrel, RowExclusiveLock);
814 0 : ereport(NOTICE,
815 : (errmsg("database \"%s\" does not exist, skipping",
816 : dbname)));
817 0 : return;
818 : }
819 : }
820 :
821 : /*
822 : * Permission checks
823 : */
824 0 : if (!pg_database_ownercheck(db_id, GetUserId()))
825 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
826 : dbname);
827 :
828 : /* DROP hook for the database being removed */
829 0 : InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
830 :
831 : /*
832 : * Disallow dropping a DB that is marked istemplate. This is just to
833 : * prevent people from accidentally dropping template0 or template1; they
834 : * can do so if they're really determined ...
835 : */
836 0 : if (db_istemplate)
837 0 : ereport(ERROR,
838 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
839 : errmsg("cannot drop a template database")));
840 :
841 : /* Obviously can't drop my own database */
842 0 : if (db_id == MyDatabaseId)
843 0 : ereport(ERROR,
844 : (errcode(ERRCODE_OBJECT_IN_USE),
845 : errmsg("cannot drop the currently open database")));
846 :
847 : /*
848 : * Check whether there are active logical slots that refer to the
849 : * to-be-dropped database. The database lock we are holding prevents the
850 : * creation of new slots using the database or existing slots becoming
851 : * active.
852 : */
853 0 : (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
854 0 : if (nslots_active)
855 : {
856 0 : ereport(ERROR,
857 : (errcode(ERRCODE_OBJECT_IN_USE),
858 : errmsg("database \"%s\" is used by an active logical replication slot",
859 : dbname),
860 : errdetail_plural("There is %d active slot",
861 : "There are %d active slots",
862 : nslots_active, nslots_active)));
863 : }
864 :
865 : /*
866 : * Check for other backends in the target database. (Because we hold the
867 : * database lock, no new ones can start after this.)
868 : *
869 : * As in CREATE DATABASE, check this after other error conditions.
870 : */
871 0 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
872 0 : ereport(ERROR,
873 : (errcode(ERRCODE_OBJECT_IN_USE),
874 : errmsg("database \"%s\" is being accessed by other users",
875 : dbname),
876 : errdetail_busy_db(notherbackends, npreparedxacts)));
877 :
878 : /*
879 : * Check if there are subscriptions defined in the target database.
880 : *
881 : * We can't drop them automatically because they might be holding
882 : * resources in other databases/instances.
883 : */
884 0 : if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
885 0 : ereport(ERROR,
886 : (errcode(ERRCODE_OBJECT_IN_USE),
887 : errmsg("database \"%s\" is being used by logical replication subscription",
888 : dbname),
889 : errdetail_plural("There is %d subscription.",
890 : "There are %d subscriptions.",
891 : nsubscriptions, nsubscriptions)));
892 :
893 : /*
894 : * Remove the database's tuple from pg_database.
895 : */
896 0 : tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
897 0 : if (!HeapTupleIsValid(tup))
898 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
899 :
900 0 : CatalogTupleDelete(pgdbrel, &tup->t_self);
901 :
902 0 : ReleaseSysCache(tup);
903 :
904 : /*
905 : * Delete any comments or security labels associated with the database.
906 : */
907 0 : DeleteSharedComments(db_id, DatabaseRelationId);
908 0 : DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
909 :
910 : /*
911 : * Remove settings associated with this database
912 : */
913 0 : DropSetting(db_id, InvalidOid);
914 :
915 : /*
916 : * Remove shared dependency references for the database.
917 : */
918 0 : dropDatabaseDependencies(db_id);
919 :
920 : /*
921 : * Drop db-specific replication slots.
922 : */
923 0 : ReplicationSlotsDropDBSlots(db_id);
924 :
925 : /*
926 : * Drop pages for this database that are in the shared buffer cache. This
927 : * is important to ensure that no remaining backend tries to write out a
928 : * dirty buffer to the dead database later...
929 : */
930 0 : DropDatabaseBuffers(db_id);
931 :
932 : /*
933 : * Tell the stats collector to forget it immediately, too.
934 : */
935 0 : pgstat_drop_database(db_id);
936 :
937 : /*
938 : * Tell checkpointer to forget any pending fsync and unlink requests for
939 : * files in the database; else the fsyncs will fail at next checkpoint, or
940 : * worse, it will delete files that belong to a newly created database
941 : * with the same OID.
942 : */
943 0 : ForgetDatabaseFsyncRequests(db_id);
944 :
945 : /*
946 : * Force a checkpoint to make sure the checkpointer has received the
947 : * message sent by ForgetDatabaseFsyncRequests. On Windows, this also
948 : * ensures that background procs don't hold any open files, which would
949 : * cause rmdir() to fail.
950 : */
951 0 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
952 :
953 : /*
954 : * Remove all tablespace subdirs belonging to the database.
955 : */
956 0 : remove_dbtablespaces(db_id);
957 :
958 : /*
959 : * Close pg_database, but keep lock till commit.
960 : */
961 0 : heap_close(pgdbrel, NoLock);
962 :
963 : /*
964 : * Force synchronous commit, thus minimizing the window between removal of
965 : * the database files and committal of the transaction. If we crash before
966 : * committing, we'll have a DB that's gone on disk but still there
967 : * according to pg_database, which is not good.
968 : */
969 0 : ForceSyncCommit();
970 : }
971 :
972 :
973 : /*
974 : * Rename database
975 : */
976 : ObjectAddress
977 0 : RenameDatabase(const char *oldname, const char *newname)
978 : {
979 : Oid db_id;
980 : HeapTuple newtup;
981 : Relation rel;
982 : int notherbackends;
983 : int npreparedxacts;
984 : ObjectAddress address;
985 :
986 : /*
987 : * Look up the target database's OID, and get exclusive lock on it. We
988 : * need this for the same reasons as DROP DATABASE.
989 : */
990 0 : rel = heap_open(DatabaseRelationId, RowExclusiveLock);
991 :
992 0 : if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
993 : NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
994 0 : ereport(ERROR,
995 : (errcode(ERRCODE_UNDEFINED_DATABASE),
996 : errmsg("database \"%s\" does not exist", oldname)));
997 :
998 : /* must be owner */
999 0 : if (!pg_database_ownercheck(db_id, GetUserId()))
1000 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1001 : oldname);
1002 :
1003 : /* must have createdb rights */
1004 0 : if (!have_createdb_privilege())
1005 0 : ereport(ERROR,
1006 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1007 : errmsg("permission denied to rename database")));
1008 :
1009 : /*
1010 : * Make sure the new name doesn't exist. See notes for same error in
1011 : * CREATE DATABASE.
1012 : */
1013 0 : if (OidIsValid(get_database_oid(newname, true)))
1014 0 : ereport(ERROR,
1015 : (errcode(ERRCODE_DUPLICATE_DATABASE),
1016 : errmsg("database \"%s\" already exists", newname)));
1017 :
1018 : /*
1019 : * XXX Client applications probably store the current database somewhere,
1020 : * so renaming it could cause confusion. On the other hand, there may not
1021 : * be an actual problem besides a little confusion, so think about this
1022 : * and decide.
1023 : */
1024 0 : if (db_id == MyDatabaseId)
1025 0 : ereport(ERROR,
1026 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1027 : errmsg("current database cannot be renamed")));
1028 :
1029 : /*
1030 : * Make sure the database does not have active sessions. This is the same
1031 : * concern as above, but applied to other sessions.
1032 : *
1033 : * As in CREATE DATABASE, check this after other error conditions.
1034 : */
1035 0 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1036 0 : ereport(ERROR,
1037 : (errcode(ERRCODE_OBJECT_IN_USE),
1038 : errmsg("database \"%s\" is being accessed by other users",
1039 : oldname),
1040 : errdetail_busy_db(notherbackends, npreparedxacts)));
1041 :
1042 : /* rename */
1043 0 : newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1044 0 : if (!HeapTupleIsValid(newtup))
1045 0 : elog(ERROR, "cache lookup failed for database %u", db_id);
1046 0 : namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1047 0 : CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1048 :
1049 0 : InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1050 :
1051 0 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1052 :
1053 : /*
1054 : * Close pg_database, but keep lock till commit.
1055 : */
1056 0 : heap_close(rel, NoLock);
1057 :
1058 0 : return address;
1059 : }
1060 :
1061 :
1062 : /*
1063 : * ALTER DATABASE SET TABLESPACE
1064 : */
1065 : static void
1066 0 : movedb(const char *dbname, const char *tblspcname)
1067 : {
1068 : Oid db_id;
1069 : Relation pgdbrel;
1070 : int notherbackends;
1071 : int npreparedxacts;
1072 : HeapTuple oldtuple,
1073 : newtuple;
1074 : Oid src_tblspcoid,
1075 : dst_tblspcoid;
1076 : Datum new_record[Natts_pg_database];
1077 : bool new_record_nulls[Natts_pg_database];
1078 : bool new_record_repl[Natts_pg_database];
1079 : ScanKeyData scankey;
1080 : SysScanDesc sysscan;
1081 : AclResult aclresult;
1082 : char *src_dbpath;
1083 : char *dst_dbpath;
1084 : DIR *dstdir;
1085 : struct dirent *xlde;
1086 : movedb_failure_params fparms;
1087 :
1088 : /*
1089 : * Look up the target database's OID, and get exclusive lock on it. We
1090 : * need this to ensure that no new backend starts up in the database while
1091 : * we are moving it, and that no one is using it as a CREATE DATABASE
1092 : * template or trying to delete it.
1093 : */
1094 0 : pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
1095 :
1096 0 : if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1097 : NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1098 0 : ereport(ERROR,
1099 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1100 : errmsg("database \"%s\" does not exist", dbname)));
1101 :
1102 : /*
1103 : * We actually need a session lock, so that the lock will persist across
1104 : * the commit/restart below. (We could almost get away with letting the
1105 : * lock be released at commit, except that someone could try to move
1106 : * relations of the DB back into the old directory while we rmtree() it.)
1107 : */
1108 0 : LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1109 : AccessExclusiveLock);
1110 :
1111 : /*
1112 : * Permission checks
1113 : */
1114 0 : if (!pg_database_ownercheck(db_id, GetUserId()))
1115 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1116 : dbname);
1117 :
1118 : /*
1119 : * Obviously can't move the tables of my own database
1120 : */
1121 0 : if (db_id == MyDatabaseId)
1122 0 : ereport(ERROR,
1123 : (errcode(ERRCODE_OBJECT_IN_USE),
1124 : errmsg("cannot change the tablespace of the currently open database")));
1125 :
1126 : /*
1127 : * Get tablespace's oid
1128 : */
1129 0 : dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1130 :
1131 : /*
1132 : * Permission checks
1133 : */
1134 0 : aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1135 : ACL_CREATE);
1136 0 : if (aclresult != ACLCHECK_OK)
1137 0 : aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
1138 : tblspcname);
1139 :
1140 : /*
1141 : * pg_global must never be the default tablespace
1142 : */
1143 0 : if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1144 0 : ereport(ERROR,
1145 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1146 : errmsg("pg_global cannot be used as default tablespace")));
1147 :
1148 : /*
1149 : * No-op if same tablespace
1150 : */
1151 0 : if (src_tblspcoid == dst_tblspcoid)
1152 : {
1153 0 : heap_close(pgdbrel, NoLock);
1154 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1155 : AccessExclusiveLock);
1156 0 : return;
1157 : }
1158 :
1159 : /*
1160 : * Check for other backends in the target database. (Because we hold the
1161 : * database lock, no new ones can start after this.)
1162 : *
1163 : * As in CREATE DATABASE, check this after other error conditions.
1164 : */
1165 0 : if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1166 0 : ereport(ERROR,
1167 : (errcode(ERRCODE_OBJECT_IN_USE),
1168 : errmsg("database \"%s\" is being accessed by other users",
1169 : dbname),
1170 : errdetail_busy_db(notherbackends, npreparedxacts)));
1171 :
1172 : /*
1173 : * Get old and new database paths
1174 : */
1175 0 : src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1176 0 : dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1177 :
1178 : /*
1179 : * Force a checkpoint before proceeding. This will force all dirty
1180 : * buffers, including those of unlogged tables, out to disk, to ensure
1181 : * source database is up-to-date on disk for the copy.
1182 : * FlushDatabaseBuffers() would suffice for that, but we also want to
1183 : * process any pending unlink requests. Otherwise, the check for existing
1184 : * files in the target directory might fail unnecessarily, not to mention
1185 : * that the copy might fail due to source files getting deleted under it.
1186 : * On Windows, this also ensures that background procs don't hold any open
1187 : * files, which would cause rmdir() to fail.
1188 : */
1189 0 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
1190 : | CHECKPOINT_FLUSH_ALL);
1191 :
1192 : /*
1193 : * Now drop all buffers holding data of the target database; they should
1194 : * no longer be dirty so DropDatabaseBuffers is safe.
1195 : *
1196 : * It might seem that we could just let these buffers age out of shared
1197 : * buffers naturally, since they should not get referenced anymore. The
1198 : * problem with that is that if the user later moves the database back to
1199 : * its original tablespace, any still-surviving buffers would appear to
1200 : * contain valid data again --- but they'd be missing any changes made in
1201 : * the database while it was in the new tablespace. In any case, freeing
1202 : * buffers that should never be used again seems worth the cycles.
1203 : *
1204 : * Note: it'd be sufficient to get rid of buffers matching db_id and
1205 : * src_tblspcoid, but bufmgr.c presently provides no API for that.
1206 : */
1207 0 : DropDatabaseBuffers(db_id);
1208 :
1209 : /*
1210 : * Check for existence of files in the target directory, i.e., objects of
1211 : * this database that are already in the target tablespace. We can't
1212 : * allow the move in such a case, because we would need to change those
1213 : * relations' pg_class.reltablespace entries to zero, and we don't have
1214 : * access to the DB's pg_class to do so.
1215 : */
1216 0 : dstdir = AllocateDir(dst_dbpath);
1217 0 : if (dstdir != NULL)
1218 : {
1219 0 : while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1220 : {
1221 0 : if (strcmp(xlde->d_name, ".") == 0 ||
1222 0 : strcmp(xlde->d_name, "..") == 0)
1223 0 : continue;
1224 :
1225 0 : ereport(ERROR,
1226 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1227 : errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1228 : dbname, tblspcname),
1229 : errhint("You must move them back to the database's default tablespace before using this command.")));
1230 : }
1231 :
1232 0 : FreeDir(dstdir);
1233 :
1234 : /*
1235 : * The directory exists but is empty. We must remove it before using
1236 : * the copydir function.
1237 : */
1238 0 : if (rmdir(dst_dbpath) != 0)
1239 0 : elog(ERROR, "could not remove directory \"%s\": %m",
1240 : dst_dbpath);
1241 : }
1242 :
1243 : /*
1244 : * Use an ENSURE block to make sure we remove the debris if the copy fails
1245 : * (eg, due to out-of-disk-space). This is not a 100% solution, because
1246 : * of the possibility of failure during transaction commit, but it should
1247 : * handle most scenarios.
1248 : */
1249 0 : fparms.dest_dboid = db_id;
1250 0 : fparms.dest_tsoid = dst_tblspcoid;
1251 0 : PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1252 : PointerGetDatum(&fparms));
1253 : {
1254 : /*
1255 : * Copy files from the old tablespace to the new one
1256 : */
1257 0 : copydir(src_dbpath, dst_dbpath, false);
1258 :
1259 : /*
1260 : * Record the filesystem change in XLOG
1261 : */
1262 : {
1263 : xl_dbase_create_rec xlrec;
1264 :
1265 0 : xlrec.db_id = db_id;
1266 0 : xlrec.tablespace_id = dst_tblspcoid;
1267 0 : xlrec.src_db_id = db_id;
1268 0 : xlrec.src_tablespace_id = src_tblspcoid;
1269 :
1270 0 : XLogBeginInsert();
1271 0 : XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
1272 :
1273 0 : (void) XLogInsert(RM_DBASE_ID,
1274 : XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
1275 : }
1276 :
1277 : /*
1278 : * Update the database's pg_database tuple
1279 : */
1280 0 : ScanKeyInit(&scankey,
1281 : Anum_pg_database_datname,
1282 : BTEqualStrategyNumber, F_NAMEEQ,
1283 : CStringGetDatum(dbname));
1284 0 : sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1285 : NULL, 1, &scankey);
1286 0 : oldtuple = systable_getnext(sysscan);
1287 0 : if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
1288 0 : ereport(ERROR,
1289 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1290 : errmsg("database \"%s\" does not exist", dbname)));
1291 :
1292 0 : MemSet(new_record, 0, sizeof(new_record));
1293 0 : MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1294 0 : MemSet(new_record_repl, false, sizeof(new_record_repl));
1295 :
1296 0 : new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1297 0 : new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1298 :
1299 0 : newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1300 : new_record,
1301 : new_record_nulls, new_record_repl);
1302 0 : CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
1303 :
1304 0 : InvokeObjectPostAlterHook(DatabaseRelationId,
1305 : HeapTupleGetOid(newtuple), 0);
1306 :
1307 0 : systable_endscan(sysscan);
1308 :
1309 : /*
1310 : * Force another checkpoint here. As in CREATE DATABASE, this is to
1311 : * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1312 : * operation, which would cause us to lose any unlogged operations
1313 : * done in the new DB tablespace before the next checkpoint.
1314 : */
1315 0 : RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1316 :
1317 : /*
1318 : * Force synchronous commit, thus minimizing the window between
1319 : * copying the database files and committal of the transaction. If we
1320 : * crash before committing, we'll leave an orphaned set of files on
1321 : * disk, which is not fatal but not good either.
1322 : */
1323 0 : ForceSyncCommit();
1324 :
1325 : /*
1326 : * Close pg_database, but keep lock till commit.
1327 : */
1328 0 : heap_close(pgdbrel, NoLock);
1329 : }
1330 0 : PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1331 : PointerGetDatum(&fparms));
1332 :
1333 : /*
1334 : * Commit the transaction so that the pg_database update is committed. If
1335 : * we crash while removing files, the database won't be corrupt, we'll
1336 : * just leave some orphaned files in the old directory.
1337 : *
1338 : * (This is OK because we know we aren't inside a transaction block.)
1339 : *
1340 : * XXX would it be safe/better to do this inside the ensure block? Not
1341 : * convinced it's a good idea; consider elog just after the transaction
1342 : * really commits.
1343 : */
1344 0 : PopActiveSnapshot();
1345 0 : CommitTransactionCommand();
1346 :
1347 : /* Start new transaction for the remaining work; don't need a snapshot */
1348 0 : StartTransactionCommand();
1349 :
1350 : /*
1351 : * Remove files from the old tablespace
1352 : */
1353 0 : if (!rmtree(src_dbpath, true))
1354 0 : ereport(WARNING,
1355 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
1356 : src_dbpath)));
1357 :
1358 : /*
1359 : * Record the filesystem change in XLOG
1360 : */
1361 : {
1362 : xl_dbase_drop_rec xlrec;
1363 :
1364 0 : xlrec.db_id = db_id;
1365 0 : xlrec.tablespace_id = src_tblspcoid;
1366 :
1367 0 : XLogBeginInsert();
1368 0 : XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1369 :
1370 0 : (void) XLogInsert(RM_DBASE_ID,
1371 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1372 : }
1373 :
1374 : /* Now it's safe to release the database lock */
1375 0 : UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1376 : AccessExclusiveLock);
1377 : }
1378 :
1379 : /* Error cleanup callback for movedb */
1380 : static void
1381 0 : movedb_failure_callback(int code, Datum arg)
1382 : {
1383 0 : movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1384 : char *dstpath;
1385 :
1386 : /* Get rid of anything we managed to copy to the target directory */
1387 0 : dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1388 :
1389 0 : (void) rmtree(dstpath, true);
1390 0 : }
1391 :
1392 :
1393 : /*
1394 : * ALTER DATABASE name ...
1395 : */
1396 : Oid
1397 0 : AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
1398 : {
1399 : Relation rel;
1400 : Oid dboid;
1401 : HeapTuple tuple,
1402 : newtuple;
1403 : ScanKeyData scankey;
1404 : SysScanDesc scan;
1405 : ListCell *option;
1406 0 : bool dbistemplate = false;
1407 0 : bool dballowconnections = true;
1408 0 : int dbconnlimit = -1;
1409 0 : DefElem *distemplate = NULL;
1410 0 : DefElem *dallowconnections = NULL;
1411 0 : DefElem *dconnlimit = NULL;
1412 0 : DefElem *dtablespace = NULL;
1413 : Datum new_record[Natts_pg_database];
1414 : bool new_record_nulls[Natts_pg_database];
1415 : bool new_record_repl[Natts_pg_database];
1416 :
1417 : /* Extract options from the statement node tree */
1418 0 : foreach(option, stmt->options)
1419 : {
1420 0 : DefElem *defel = (DefElem *) lfirst(option);
1421 :
1422 0 : if (strcmp(defel->defname, "is_template") == 0)
1423 : {
1424 0 : if (distemplate)
1425 0 : ereport(ERROR,
1426 : (errcode(ERRCODE_SYNTAX_ERROR),
1427 : errmsg("conflicting or redundant options"),
1428 : parser_errposition(pstate, defel->location)));
1429 0 : distemplate = defel;
1430 : }
1431 0 : else if (strcmp(defel->defname, "allow_connections") == 0)
1432 : {
1433 0 : if (dallowconnections)
1434 0 : ereport(ERROR,
1435 : (errcode(ERRCODE_SYNTAX_ERROR),
1436 : errmsg("conflicting or redundant options"),
1437 : parser_errposition(pstate, defel->location)));
1438 0 : dallowconnections = defel;
1439 : }
1440 0 : else if (strcmp(defel->defname, "connection_limit") == 0)
1441 : {
1442 0 : if (dconnlimit)
1443 0 : ereport(ERROR,
1444 : (errcode(ERRCODE_SYNTAX_ERROR),
1445 : errmsg("conflicting or redundant options"),
1446 : parser_errposition(pstate, defel->location)));
1447 0 : dconnlimit = defel;
1448 : }
1449 0 : else if (strcmp(defel->defname, "tablespace") == 0)
1450 : {
1451 0 : if (dtablespace)
1452 0 : ereport(ERROR,
1453 : (errcode(ERRCODE_SYNTAX_ERROR),
1454 : errmsg("conflicting or redundant options"),
1455 : parser_errposition(pstate, defel->location)));
1456 0 : dtablespace = defel;
1457 : }
1458 : else
1459 0 : ereport(ERROR,
1460 : (errcode(ERRCODE_SYNTAX_ERROR),
1461 : errmsg("option \"%s\" not recognized", defel->defname),
1462 : parser_errposition(pstate, defel->location)));
1463 : }
1464 :
1465 0 : if (dtablespace)
1466 : {
1467 : /*
1468 : * While the SET TABLESPACE syntax doesn't allow any other options,
1469 : * somebody could write "WITH TABLESPACE ...". Forbid any other
1470 : * options from being specified in that case.
1471 : */
1472 0 : if (list_length(stmt->options) != 1)
1473 0 : ereport(ERROR,
1474 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1475 : errmsg("option \"%s\" cannot be specified with other options",
1476 : dtablespace->defname),
1477 : parser_errposition(pstate, dtablespace->location)));
1478 : /* this case isn't allowed within a transaction block */
1479 0 : PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1480 0 : movedb(stmt->dbname, defGetString(dtablespace));
1481 0 : return InvalidOid;
1482 : }
1483 :
1484 0 : if (distemplate && distemplate->arg)
1485 0 : dbistemplate = defGetBoolean(distemplate);
1486 0 : if (dallowconnections && dallowconnections->arg)
1487 0 : dballowconnections = defGetBoolean(dallowconnections);
1488 0 : if (dconnlimit && dconnlimit->arg)
1489 : {
1490 0 : dbconnlimit = defGetInt32(dconnlimit);
1491 0 : if (dbconnlimit < -1)
1492 0 : ereport(ERROR,
1493 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1494 : errmsg("invalid connection limit: %d", dbconnlimit)));
1495 : }
1496 :
1497 : /*
1498 : * Get the old tuple. We don't need a lock on the database per se,
1499 : * because we're not going to do anything that would mess up incoming
1500 : * connections.
1501 : */
1502 0 : rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1503 0 : ScanKeyInit(&scankey,
1504 : Anum_pg_database_datname,
1505 : BTEqualStrategyNumber, F_NAMEEQ,
1506 0 : CStringGetDatum(stmt->dbname));
1507 0 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1508 : NULL, 1, &scankey);
1509 0 : tuple = systable_getnext(scan);
1510 0 : if (!HeapTupleIsValid(tuple))
1511 0 : ereport(ERROR,
1512 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1513 : errmsg("database \"%s\" does not exist", stmt->dbname)));
1514 :
1515 0 : dboid = HeapTupleGetOid(tuple);
1516 :
1517 0 : if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1518 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1519 0 : stmt->dbname);
1520 :
1521 : /*
1522 : * In order to avoid getting locked out and having to go through
1523 : * standalone mode, we refuse to disallow connections to the database
1524 : * we're currently connected to. Lockout can still happen with concurrent
1525 : * sessions but the likeliness of that is not high enough to worry about.
1526 : */
1527 0 : if (!dballowconnections && dboid == MyDatabaseId)
1528 0 : ereport(ERROR,
1529 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1530 : errmsg("cannot disallow connections for current database")));
1531 :
1532 : /*
1533 : * Build an updated tuple, perusing the information just obtained
1534 : */
1535 0 : MemSet(new_record, 0, sizeof(new_record));
1536 0 : MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1537 0 : MemSet(new_record_repl, false, sizeof(new_record_repl));
1538 :
1539 0 : if (distemplate)
1540 : {
1541 0 : new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1542 0 : new_record_repl[Anum_pg_database_datistemplate - 1] = true;
1543 : }
1544 0 : if (dallowconnections)
1545 : {
1546 0 : new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1547 0 : new_record_repl[Anum_pg_database_datallowconn - 1] = true;
1548 : }
1549 0 : if (dconnlimit)
1550 : {
1551 0 : new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1552 0 : new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1553 : }
1554 :
1555 0 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1556 : new_record_nulls, new_record_repl);
1557 0 : CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
1558 :
1559 0 : InvokeObjectPostAlterHook(DatabaseRelationId,
1560 : HeapTupleGetOid(newtuple), 0);
1561 :
1562 0 : systable_endscan(scan);
1563 :
1564 : /* Close pg_database, but keep lock till commit */
1565 0 : heap_close(rel, NoLock);
1566 :
1567 0 : return dboid;
1568 : }
1569 :
1570 :
1571 : /*
1572 : * ALTER DATABASE name SET ...
1573 : */
1574 : Oid
1575 6 : AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1576 : {
1577 6 : Oid datid = get_database_oid(stmt->dbname, false);
1578 :
1579 : /*
1580 : * Obtain a lock on the database and make sure it didn't go away in the
1581 : * meantime.
1582 : */
1583 6 : shdepLockAndCheckObject(DatabaseRelationId, datid);
1584 :
1585 6 : if (!pg_database_ownercheck(datid, GetUserId()))
1586 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1587 0 : stmt->dbname);
1588 :
1589 6 : AlterSetting(datid, InvalidOid, stmt->setstmt);
1590 :
1591 6 : UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1592 :
1593 6 : return datid;
1594 : }
1595 :
1596 :
1597 : /*
1598 : * ALTER DATABASE name OWNER TO newowner
1599 : */
1600 : ObjectAddress
1601 0 : AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1602 : {
1603 : Oid db_id;
1604 : HeapTuple tuple;
1605 : Relation rel;
1606 : ScanKeyData scankey;
1607 : SysScanDesc scan;
1608 : Form_pg_database datForm;
1609 : ObjectAddress address;
1610 :
1611 : /*
1612 : * Get the old tuple. We don't need a lock on the database per se,
1613 : * because we're not going to do anything that would mess up incoming
1614 : * connections.
1615 : */
1616 0 : rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1617 0 : ScanKeyInit(&scankey,
1618 : Anum_pg_database_datname,
1619 : BTEqualStrategyNumber, F_NAMEEQ,
1620 : CStringGetDatum(dbname));
1621 0 : scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1622 : NULL, 1, &scankey);
1623 0 : tuple = systable_getnext(scan);
1624 0 : if (!HeapTupleIsValid(tuple))
1625 0 : ereport(ERROR,
1626 : (errcode(ERRCODE_UNDEFINED_DATABASE),
1627 : errmsg("database \"%s\" does not exist", dbname)));
1628 :
1629 0 : db_id = HeapTupleGetOid(tuple);
1630 0 : datForm = (Form_pg_database) GETSTRUCT(tuple);
1631 :
1632 : /*
1633 : * If the new owner is the same as the existing owner, consider the
1634 : * command to have succeeded. This is to be consistent with other
1635 : * objects.
1636 : */
1637 0 : if (datForm->datdba != newOwnerId)
1638 : {
1639 : Datum repl_val[Natts_pg_database];
1640 : bool repl_null[Natts_pg_database];
1641 : bool repl_repl[Natts_pg_database];
1642 : Acl *newAcl;
1643 : Datum aclDatum;
1644 : bool isNull;
1645 : HeapTuple newtuple;
1646 :
1647 : /* Otherwise, must be owner of the existing object */
1648 0 : if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1649 0 : aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1650 : dbname);
1651 :
1652 : /* Must be able to become new owner */
1653 0 : check_is_member_of_role(GetUserId(), newOwnerId);
1654 :
1655 : /*
1656 : * must have createdb rights
1657 : *
1658 : * NOTE: This is different from other alter-owner checks in that the
1659 : * current user is checked for createdb privileges instead of the
1660 : * destination owner. This is consistent with the CREATE case for
1661 : * databases. Because superusers will always have this right, we need
1662 : * no special case for them.
1663 : */
1664 0 : if (!have_createdb_privilege())
1665 0 : ereport(ERROR,
1666 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1667 : errmsg("permission denied to change owner of database")));
1668 :
1669 0 : memset(repl_null, false, sizeof(repl_null));
1670 0 : memset(repl_repl, false, sizeof(repl_repl));
1671 :
1672 0 : repl_repl[Anum_pg_database_datdba - 1] = true;
1673 0 : repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1674 :
1675 : /*
1676 : * Determine the modified ACL for the new owner. This is only
1677 : * necessary when the ACL is non-null.
1678 : */
1679 0 : aclDatum = heap_getattr(tuple,
1680 : Anum_pg_database_datacl,
1681 : RelationGetDescr(rel),
1682 : &isNull);
1683 0 : if (!isNull)
1684 : {
1685 0 : newAcl = aclnewowner(DatumGetAclP(aclDatum),
1686 : datForm->datdba, newOwnerId);
1687 0 : repl_repl[Anum_pg_database_datacl - 1] = true;
1688 0 : repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1689 : }
1690 :
1691 0 : newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1692 0 : CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1693 :
1694 0 : heap_freetuple(newtuple);
1695 :
1696 : /* Update owner dependency reference */
1697 0 : changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1698 : newOwnerId);
1699 : }
1700 :
1701 0 : InvokeObjectPostAlterHook(DatabaseRelationId, HeapTupleGetOid(tuple), 0);
1702 :
1703 0 : ObjectAddressSet(address, DatabaseRelationId, db_id);
1704 :
1705 0 : systable_endscan(scan);
1706 :
1707 : /* Close pg_database, but keep lock till commit */
1708 0 : heap_close(rel, NoLock);
1709 :
1710 0 : return address;
1711 : }
1712 :
1713 :
1714 : /*
1715 : * Helper functions
1716 : */
1717 :
1718 : /*
1719 : * Look up info about the database named "name". If the database exists,
1720 : * obtain the specified lock type on it, fill in any of the remaining
1721 : * parameters that aren't NULL, and return TRUE. If no such database,
1722 : * return FALSE.
1723 : */
1724 : static bool
1725 3 : get_db_info(const char *name, LOCKMODE lockmode,
1726 : Oid *dbIdP, Oid *ownerIdP,
1727 : int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1728 : Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1729 : MultiXactId *dbMinMultiP,
1730 : Oid *dbTablespace, char **dbCollate, char **dbCtype)
1731 : {
1732 3 : bool result = false;
1733 : Relation relation;
1734 :
1735 3 : AssertArg(name);
1736 :
1737 : /* Caller may wish to grab a better lock on pg_database beforehand... */
1738 3 : relation = heap_open(DatabaseRelationId, AccessShareLock);
1739 :
1740 : /*
1741 : * Loop covers the rare case where the database is renamed before we can
1742 : * lock it. We try again just in case we can find a new one of the same
1743 : * name.
1744 : */
1745 : for (;;)
1746 : {
1747 : ScanKeyData scanKey;
1748 : SysScanDesc scan;
1749 : HeapTuple tuple;
1750 : Oid dbOid;
1751 :
1752 : /*
1753 : * there's no syscache for database-indexed-by-name, so must do it the
1754 : * hard way
1755 : */
1756 3 : ScanKeyInit(&scanKey,
1757 : Anum_pg_database_datname,
1758 : BTEqualStrategyNumber, F_NAMEEQ,
1759 : CStringGetDatum(name));
1760 :
1761 3 : scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1762 : NULL, 1, &scanKey);
1763 :
1764 3 : tuple = systable_getnext(scan);
1765 :
1766 3 : if (!HeapTupleIsValid(tuple))
1767 : {
1768 : /* definitely no database of that name */
1769 0 : systable_endscan(scan);
1770 0 : break;
1771 : }
1772 :
1773 3 : dbOid = HeapTupleGetOid(tuple);
1774 :
1775 3 : systable_endscan(scan);
1776 :
1777 : /*
1778 : * Now that we have a database OID, we can try to lock the DB.
1779 : */
1780 3 : if (lockmode != NoLock)
1781 3 : LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1782 :
1783 : /*
1784 : * And now, re-fetch the tuple by OID. If it's still there and still
1785 : * the same name, we win; else, drop the lock and loop back to try
1786 : * again.
1787 : */
1788 3 : tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1789 3 : if (HeapTupleIsValid(tuple))
1790 : {
1791 3 : Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1792 :
1793 3 : if (strcmp(name, NameStr(dbform->datname)) == 0)
1794 : {
1795 : /* oid of the database */
1796 3 : if (dbIdP)
1797 3 : *dbIdP = dbOid;
1798 : /* oid of the owner */
1799 3 : if (ownerIdP)
1800 3 : *ownerIdP = dbform->datdba;
1801 : /* character encoding */
1802 3 : if (encodingP)
1803 3 : *encodingP = dbform->encoding;
1804 : /* allowed as template? */
1805 3 : if (dbIsTemplateP)
1806 3 : *dbIsTemplateP = dbform->datistemplate;
1807 : /* allowing connections? */
1808 3 : if (dbAllowConnP)
1809 3 : *dbAllowConnP = dbform->datallowconn;
1810 : /* last system OID used in database */
1811 3 : if (dbLastSysOidP)
1812 3 : *dbLastSysOidP = dbform->datlastsysoid;
1813 : /* limit of frozen XIDs */
1814 3 : if (dbFrozenXidP)
1815 3 : *dbFrozenXidP = dbform->datfrozenxid;
1816 : /* minimum MultixactId */
1817 3 : if (dbMinMultiP)
1818 3 : *dbMinMultiP = dbform->datminmxid;
1819 : /* default tablespace for this database */
1820 3 : if (dbTablespace)
1821 3 : *dbTablespace = dbform->dattablespace;
1822 : /* default locale settings for this database */
1823 3 : if (dbCollate)
1824 3 : *dbCollate = pstrdup(NameStr(dbform->datcollate));
1825 3 : if (dbCtype)
1826 3 : *dbCtype = pstrdup(NameStr(dbform->datctype));
1827 3 : ReleaseSysCache(tuple);
1828 3 : result = true;
1829 3 : break;
1830 : }
1831 : /* can only get here if it was just renamed */
1832 0 : ReleaseSysCache(tuple);
1833 : }
1834 :
1835 0 : if (lockmode != NoLock)
1836 0 : UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1837 0 : }
1838 :
1839 3 : heap_close(relation, AccessShareLock);
1840 :
1841 3 : return result;
1842 : }
1843 :
1844 : /* Check if current user has createdb privileges */
1845 : static bool
1846 3 : have_createdb_privilege(void)
1847 : {
1848 3 : bool result = false;
1849 : HeapTuple utup;
1850 :
1851 : /* Superusers can always do everything */
1852 3 : if (superuser())
1853 3 : return true;
1854 :
1855 0 : utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
1856 0 : if (HeapTupleIsValid(utup))
1857 : {
1858 0 : result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1859 0 : ReleaseSysCache(utup);
1860 : }
1861 0 : return result;
1862 : }
1863 :
1864 : /*
1865 : * Remove tablespace directories
1866 : *
1867 : * We don't know what tablespaces db_id is using, so iterate through all
1868 : * tablespaces removing <tablespace>/db_id
1869 : */
1870 : static void
1871 0 : remove_dbtablespaces(Oid db_id)
1872 : {
1873 : Relation rel;
1874 : HeapScanDesc scan;
1875 : HeapTuple tuple;
1876 :
1877 0 : rel = heap_open(TableSpaceRelationId, AccessShareLock);
1878 0 : scan = heap_beginscan_catalog(rel, 0, NULL);
1879 0 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1880 : {
1881 0 : Oid dsttablespace = HeapTupleGetOid(tuple);
1882 : char *dstpath;
1883 : struct stat st;
1884 :
1885 : /* Don't mess with the global tablespace */
1886 0 : if (dsttablespace == GLOBALTABLESPACE_OID)
1887 0 : continue;
1888 :
1889 0 : dstpath = GetDatabasePath(db_id, dsttablespace);
1890 :
1891 0 : if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1892 : {
1893 : /* Assume we can ignore it */
1894 0 : pfree(dstpath);
1895 0 : continue;
1896 : }
1897 :
1898 0 : if (!rmtree(dstpath, true))
1899 0 : ereport(WARNING,
1900 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
1901 : dstpath)));
1902 :
1903 : /* Record the filesystem change in XLOG */
1904 : {
1905 : xl_dbase_drop_rec xlrec;
1906 :
1907 0 : xlrec.db_id = db_id;
1908 0 : xlrec.tablespace_id = dsttablespace;
1909 :
1910 0 : XLogBeginInsert();
1911 0 : XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1912 :
1913 0 : (void) XLogInsert(RM_DBASE_ID,
1914 : XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1915 : }
1916 :
1917 0 : pfree(dstpath);
1918 : }
1919 :
1920 0 : heap_endscan(scan);
1921 0 : heap_close(rel, AccessShareLock);
1922 0 : }
1923 :
1924 : /*
1925 : * Check for existing files that conflict with a proposed new DB OID;
1926 : * return TRUE if there are any
1927 : *
1928 : * If there were a subdirectory in any tablespace matching the proposed new
1929 : * OID, we'd get a create failure due to the duplicate name ... and then we'd
1930 : * try to remove that already-existing subdirectory during the cleanup in
1931 : * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
1932 : * instead we make this extra check before settling on the OID of the new
1933 : * database. This exactly parallels what GetNewRelFileNode() does for table
1934 : * relfilenode values.
1935 : */
1936 : static bool
1937 3 : check_db_file_conflict(Oid db_id)
1938 : {
1939 3 : bool result = false;
1940 : Relation rel;
1941 : HeapScanDesc scan;
1942 : HeapTuple tuple;
1943 :
1944 3 : rel = heap_open(TableSpaceRelationId, AccessShareLock);
1945 3 : scan = heap_beginscan_catalog(rel, 0, NULL);
1946 12 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1947 : {
1948 6 : Oid dsttablespace = HeapTupleGetOid(tuple);
1949 : char *dstpath;
1950 : struct stat st;
1951 :
1952 : /* Don't mess with the global tablespace */
1953 6 : if (dsttablespace == GLOBALTABLESPACE_OID)
1954 3 : continue;
1955 :
1956 3 : dstpath = GetDatabasePath(db_id, dsttablespace);
1957 :
1958 3 : if (lstat(dstpath, &st) == 0)
1959 : {
1960 : /* Found a conflicting file (or directory, whatever) */
1961 0 : pfree(dstpath);
1962 0 : result = true;
1963 0 : break;
1964 : }
1965 :
1966 3 : pfree(dstpath);
1967 : }
1968 :
1969 3 : heap_endscan(scan);
1970 3 : heap_close(rel, AccessShareLock);
1971 :
1972 3 : return result;
1973 : }
1974 :
1975 : /*
1976 : * Issue a suitable errdetail message for a busy database
1977 : */
1978 : static int
1979 0 : errdetail_busy_db(int notherbackends, int npreparedxacts)
1980 : {
1981 0 : if (notherbackends > 0 && npreparedxacts > 0)
1982 :
1983 : /*
1984 : * We don't deal with singular versus plural here, since gettext
1985 : * doesn't support multiple plurals in one string.
1986 : */
1987 0 : errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1988 : notherbackends, npreparedxacts);
1989 0 : else if (notherbackends > 0)
1990 0 : errdetail_plural("There is %d other session using the database.",
1991 : "There are %d other sessions using the database.",
1992 : notherbackends,
1993 : notherbackends);
1994 : else
1995 0 : errdetail_plural("There is %d prepared transaction using the database.",
1996 : "There are %d prepared transactions using the database.",
1997 : npreparedxacts,
1998 : npreparedxacts);
1999 0 : return 0; /* just to keep ereport macro happy */
2000 : }
2001 :
2002 : /*
2003 : * get_database_oid - given a database name, look up the OID
2004 : *
2005 : * If missing_ok is false, throw an error if database name not found. If
2006 : * true, just return InvalidOid.
2007 : */
2008 : Oid
2009 19 : get_database_oid(const char *dbname, bool missing_ok)
2010 : {
2011 : Relation pg_database;
2012 : ScanKeyData entry[1];
2013 : SysScanDesc scan;
2014 : HeapTuple dbtuple;
2015 : Oid oid;
2016 :
2017 : /*
2018 : * There's no syscache for pg_database indexed by name, so we must look
2019 : * the hard way.
2020 : */
2021 19 : pg_database = heap_open(DatabaseRelationId, AccessShareLock);
2022 19 : ScanKeyInit(&entry[0],
2023 : Anum_pg_database_datname,
2024 : BTEqualStrategyNumber, F_NAMEEQ,
2025 : CStringGetDatum(dbname));
2026 19 : scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
2027 : NULL, 1, entry);
2028 :
2029 19 : dbtuple = systable_getnext(scan);
2030 :
2031 : /* We assume that there can be at most one matching tuple */
2032 19 : if (HeapTupleIsValid(dbtuple))
2033 15 : oid = HeapTupleGetOid(dbtuple);
2034 : else
2035 4 : oid = InvalidOid;
2036 :
2037 19 : systable_endscan(scan);
2038 19 : heap_close(pg_database, AccessShareLock);
2039 :
2040 19 : if (!OidIsValid(oid) && !missing_ok)
2041 1 : ereport(ERROR,
2042 : (errcode(ERRCODE_UNDEFINED_DATABASE),
2043 : errmsg("database \"%s\" does not exist",
2044 : dbname)));
2045 :
2046 18 : return oid;
2047 : }
2048 :
2049 :
2050 : /*
2051 : * get_database_name - given a database OID, look up the name
2052 : *
2053 : * Returns a palloc'd string, or NULL if no such database.
2054 : */
2055 : char *
2056 272 : get_database_name(Oid dbid)
2057 : {
2058 : HeapTuple dbtuple;
2059 : char *result;
2060 :
2061 272 : dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2062 272 : if (HeapTupleIsValid(dbtuple))
2063 : {
2064 272 : result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
2065 272 : ReleaseSysCache(dbtuple);
2066 : }
2067 : else
2068 0 : result = NULL;
2069 :
2070 272 : return result;
2071 : }
2072 :
2073 : /*
2074 : * DATABASE resource manager's routines
2075 : */
2076 : void
2077 0 : dbase_redo(XLogReaderState *record)
2078 : {
2079 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
2080 :
2081 : /* Backup blocks are not used in dbase records */
2082 0 : Assert(!XLogRecHasAnyBlockRefs(record));
2083 :
2084 0 : if (info == XLOG_DBASE_CREATE)
2085 : {
2086 0 : xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
2087 : char *src_path;
2088 : char *dst_path;
2089 : struct stat st;
2090 :
2091 0 : src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
2092 0 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2093 :
2094 : /*
2095 : * Our theory for replaying a CREATE is to forcibly drop the target
2096 : * subdirectory if present, then re-copy the source data. This may be
2097 : * more work than needed, but it is simple to implement.
2098 : */
2099 0 : if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
2100 : {
2101 0 : if (!rmtree(dst_path, true))
2102 : /* If this failed, copydir() below is going to error. */
2103 0 : ereport(WARNING,
2104 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2105 : dst_path)));
2106 : }
2107 :
2108 : /*
2109 : * Force dirty buffers out to disk, to ensure source database is
2110 : * up-to-date for the copy.
2111 : */
2112 0 : FlushDatabaseBuffers(xlrec->src_db_id);
2113 :
2114 : /*
2115 : * Copy this subdirectory to the new location
2116 : *
2117 : * We don't need to copy subdirectories
2118 : */
2119 0 : copydir(src_path, dst_path, false);
2120 : }
2121 0 : else if (info == XLOG_DBASE_DROP)
2122 : {
2123 0 : xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
2124 : char *dst_path;
2125 :
2126 0 : dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2127 :
2128 0 : if (InHotStandby)
2129 : {
2130 : /*
2131 : * Lock database while we resolve conflicts to ensure that
2132 : * InitPostgres() cannot fully re-execute concurrently. This
2133 : * avoids backends re-connecting automatically to same database,
2134 : * which can happen in some cases.
2135 : *
2136 : * This will lock out walsenders trying to connect to db-specific
2137 : * slots for logical decoding too, so it's safe for us to drop
2138 : * slots.
2139 : */
2140 0 : LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2141 0 : ResolveRecoveryConflictWithDatabase(xlrec->db_id);
2142 : }
2143 :
2144 : /* Drop any database-specific replication slots */
2145 0 : ReplicationSlotsDropDBSlots(xlrec->db_id);
2146 :
2147 : /* Drop pages for this database that are in the shared buffer cache */
2148 0 : DropDatabaseBuffers(xlrec->db_id);
2149 :
2150 : /* Also, clean out any fsync requests that might be pending in md.c */
2151 0 : ForgetDatabaseFsyncRequests(xlrec->db_id);
2152 :
2153 : /* Clean out the xlog relcache too */
2154 0 : XLogDropDatabase(xlrec->db_id);
2155 :
2156 : /* And remove the physical files */
2157 0 : if (!rmtree(dst_path, true))
2158 0 : ereport(WARNING,
2159 : (errmsg("some useless files may be left behind in old database directory \"%s\"",
2160 : dst_path)));
2161 :
2162 0 : if (InHotStandby)
2163 : {
2164 : /*
2165 : * Release locks prior to commit. XXX There is a race condition
2166 : * here that may allow backends to reconnect, but the window for
2167 : * this is small because the gap between here and commit is mostly
2168 : * fairly small and it is unlikely that people will be dropping
2169 : * databases that we are trying to connect to anyway.
2170 : */
2171 0 : UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2172 : }
2173 : }
2174 : else
2175 0 : elog(PANIC, "dbase_redo: unknown op code %u", info);
2176 0 : }
|