#include "catalog/indexing.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_authid.h"
+#include "catalog/pg_collation.h"
#include "catalog/pg_database.h"
#include "catalog/pg_db_role_setting.h"
#include "catalog/pg_subscription.h"
Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
- Oid *dbTablespace, char **dbCollate, char **dbCtype);
+ Oid *dbTablespace, char **dbCollate, char **dbCtype,
+ char **dbCollversion);
static bool have_createdb_privilege(void);
static void remove_dbtablespaces(Oid db_id);
static bool check_db_file_conflict(Oid db_id);
int src_encoding = -1;
char *src_collate = NULL;
char *src_ctype = NULL;
+ char *src_collversion = NULL;
bool src_istemplate;
bool src_allowconn;
TransactionId src_frozenxid = InvalidTransactionId;
DefElem *distemplate = NULL;
DefElem *dallowconnections = NULL;
DefElem *dconnlimit = NULL;
+ DefElem *dcollversion = NULL;
char *dbname = stmt->dbname;
char *dbowner = NULL;
const char *dbtemplate = NULL;
bool dbistemplate = false;
bool dballowconnections = true;
int dbconnlimit = -1;
+ char *dbcollversion = NULL;
int notherbackends;
int npreparedxacts;
createdb_failure_params fparms;
errorConflictingDefElem(defel, pstate);
dconnlimit = defel;
}
+ else if (strcmp(defel->defname, "collation_version") == 0)
+ {
+ if (dcollversion)
+ errorConflictingDefElem(defel, pstate);
+ dcollversion = defel;
+ }
else if (strcmp(defel->defname, "location") == 0)
{
ereport(WARNING,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid connection limit: %d", dbconnlimit)));
}
+ if (dcollversion)
+ dbcollversion = defGetString(dcollversion);
/* obtain OID of proposed owner */
if (dbowner)
&src_dboid, &src_owner, &src_encoding,
&src_istemplate, &src_allowconn,
&src_frozenxid, &src_minmxid, &src_deftablespace,
- &src_collate, &src_ctype))
+ &src_collate, &src_ctype, &src_collversion))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("template database \"%s\" does not exist",
errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
}
+ /*
+ * If we got a collation version for the template database, check that it
+ * matches the actual OS collation version. Otherwise error; the user
+ * needs to fix the template database first. Don't complain if a
+ * collation version was specified explicitly as a statement option; that
+ * is used by pg_upgrade to reproduce the old state exactly.
+ *
+ * (If the template database has no collation version, then either the
+ * platform/provider does not support collation versioning, or it's
+ * template0, for which we stipulate that it does not contain
+ * collation-using objects.)
+ */
+ if (src_collversion && !dcollversion)
+ {
+ char *actual_versionstr;
+
+ actual_versionstr = get_collation_actual_version(COLLPROVIDER_LIBC, dbcollate);
+ if (!actual_versionstr)
+ ereport(ERROR,
+ (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
+ dbtemplate)));
+
+ if (strcmp(actual_versionstr, src_collversion) != 0)
+ ereport(ERROR,
+ (errmsg("template database \"%s\" has a collation version mismatch",
+ dbtemplate),
+ errdetail("The template database was created using collation version %s, "
+ "but the operating system provides version %s.",
+ src_collversion, actual_versionstr),
+ errhint("Rebuild all objects in the template database that use the default collation and run "
+ "ALTER DATABASE %s REFRESH COLLATION VERSION, "
+ "or build PostgreSQL with the right library version.",
+ quote_identifier(dbtemplate))));
+ }
+
+ if (dbcollversion == NULL)
+ dbcollversion = src_collversion;
+
+ /*
+ * Normally, we copy the collation version from the template database.
+ * This last resort only applies if the template database does not have a
+ * collation version, which is normally only the case for template0.
+ */
+ if (dbcollversion == NULL)
+ dbcollversion = get_collation_actual_version(COLLPROVIDER_LIBC, dbcollate);
+
/* Resolve default tablespace for new database */
if (dtablespacename && dtablespacename->arg)
{
new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
+ if (dbcollversion)
+ new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
+ else
+ new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
/*
* We deliberately set datacl to default (NULL), rather than copying it
pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
- &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL))
+ &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
{
if (!missing_ok)
{
rel = table_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
- NULL, NULL, NULL, NULL, NULL, NULL, NULL))
+ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", oldname)));
pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
- NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
+ NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database \"%s\" does not exist", dbname)));
}
+/*
+ * ALTER DATABASE name REFRESH COLLATION VERSION
+ */
+ObjectAddress
+AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
+{
+ Relation rel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ Oid db_id;
+ HeapTuple tuple;
+ Form_pg_database datForm;
+ ObjectAddress address;
+ Datum datum;
+ bool isnull;
+ char *oldversion;
+ char *newversion;
+
+ rel = table_open(DatabaseRelationId, RowExclusiveLock);
+ ScanKeyInit(&scankey,
+ Anum_pg_database_datname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(stmt->dbname));
+ scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+ NULL, 1, &scankey);
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database \"%s\" does not exist", stmt->dbname)));
+
+ datForm = (Form_pg_database) GETSTRUCT(tuple);
+ db_id = datForm->oid;
+
+ if (!pg_database_ownercheck(db_id, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
+ stmt->dbname);
+
+ datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
+ oldversion = isnull ? NULL : TextDatumGetCString(datum);
+
+ datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
+ Assert(!isnull);
+ newversion = get_collation_actual_version(COLLPROVIDER_LIBC, TextDatumGetCString(datum));
+
+ /* cannot change from NULL to non-NULL or vice versa */
+ if ((!oldversion && newversion) || (oldversion && !newversion))
+ elog(ERROR, "invalid collation version change");
+ else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
+ {
+ bool nulls[Natts_pg_database] = {0};
+ bool replaces[Natts_pg_database] = {0};
+ Datum values[Natts_pg_database] = {0};
+
+ ereport(NOTICE,
+ (errmsg("changing version from %s to %s",
+ oldversion, newversion)));
+
+ values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
+ replaces[Anum_pg_database_datcollversion - 1] = true;
+
+ tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+ values, nulls, replaces);
+ CatalogTupleUpdate(rel, &tuple->t_self, tuple);
+ heap_freetuple(tuple);
+ }
+ else
+ ereport(NOTICE,
+ (errmsg("version has not changed")));
+
+ InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
+
+ ObjectAddressSet(address, DatabaseRelationId, db_id);
+
+ systable_endscan(scan);
+
+ table_close(rel, NoLock);
+
+ return address;
+}
+
+
/*
* ALTER DATABASE name SET ...
*/
}
+Datum
+pg_database_collation_actual_version(PG_FUNCTION_ARGS)
+{
+ Oid dbid = PG_GETARG_OID(0);
+ HeapTuple tp;
+ Datum datum;
+ bool isnull;
+ char *version;
+
+ tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
+ if (!HeapTupleIsValid(tp))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("database with OID %u does not exist", dbid)));
+
+ datum = SysCacheGetAttr(DATABASEOID, tp, Anum_pg_database_datcollate, &isnull);
+ Assert(!isnull);
+ version = get_collation_actual_version(COLLPROVIDER_LIBC, TextDatumGetCString(datum));
+
+ ReleaseSysCache(tp);
+
+ if (version)
+ PG_RETURN_TEXT_P(cstring_to_text(version));
+ else
+ PG_RETURN_NULL();
+}
+
+
/*
* Helper functions
*/
Oid *dbIdP, Oid *ownerIdP,
int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
- Oid *dbTablespace, char **dbCollate, char **dbCtype)
+ Oid *dbTablespace, char **dbCollate, char **dbCtype,
+ char **dbCollversion)
{
bool result = false;
Relation relation;
Assert(!isnull);
*dbCtype = TextDatumGetCString(datum);
}
+ if (dbCollversion)
+ {
+ datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
+ if (isnull)
+ *dbCollversion = NULL;
+ else
+ *dbCollversion = TextDatumGetCString(datum);
+ }
ReleaseSysCache(tuple);
result = true;
break;