Allow CREATE/ALTER DATABASE to manipulate datistemplate and datallowconn.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 2 Jul 2014 00:10:38 +0000 (20:10 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 2 Jul 2014 00:10:38 +0000 (20:10 -0400)
Historically these database properties could be manipulated only by
manually updating pg_database, which is error-prone and only possible for
superusers.  But there seems no good reason not to allow database owners to
set them for their databases, so invent CREATE/ALTER DATABASE options to do
that.  Adjust a couple of places that were doing it the hard way to use the
commands instead.

Vik Fearing, reviewed by Pavel Stehule

contrib/pg_upgrade/pg_upgrade.c
doc/src/sgml/ref/alter_database.sgml
doc/src/sgml/ref/create_database.sgml
src/backend/commands/dbcommands.c
src/bin/initdb/initdb.c
src/bin/pg_dump/pg_dumpall.c
src/bin/psql/tab-complete.c

index ea1f9f663e40db1db25d91d6a79db7b5215ea7df..b32d81efe711535699d66f7a621e5a07de313b50 100644 (file)
@@ -540,9 +540,8 @@ set_frozenxids(void)
         */
        if (strcmp(datallowconn, "f") == 0)
            PQclear(executeQueryOrDie(conn_template1,
-                                     "UPDATE pg_catalog.pg_database "
-                                     "SET  datallowconn = true "
-                                     "WHERE datname = '%s'", datname));
+                               "ALTER DATABASE %s ALLOW_CONNECTIONS = true",
+                                     quote_identifier(datname)));
 
        conn = connectToServer(&new_cluster, datname);
 
@@ -558,9 +557,8 @@ set_frozenxids(void)
        /* Reset datallowconn flag */
        if (strcmp(datallowconn, "f") == 0)
            PQclear(executeQueryOrDie(conn_template1,
-                                     "UPDATE pg_catalog.pg_database "
-                                     "SET  datallowconn = false "
-                                     "WHERE datname = '%s'", datname));
+                              "ALTER DATABASE %s ALLOW_CONNECTIONS = false",
+                                     quote_identifier(datname)));
    }
 
    PQclear(dbres);
index 23ef75512f14f4e4f90018ebfd009fb86e6c94e5..3724c05e2c0cfd6c1128094d9545747f8a75970c 100644 (file)
@@ -25,6 +25,8 @@ ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> [ [ WITH ] <rep
 
 <phrase>where <replaceable class="PARAMETER">option</replaceable> can be:</phrase>
 
+    IS_TEMPLATE <replaceable class="PARAMETER">istemplate</replaceable>
+    ALLOW_CONNECTIONS <replaceable class="PARAMETER">allowconn</replaceable>
     CONNECTION LIMIT <replaceable class="PARAMETER">connlimit</replaceable>
 
 ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -107,6 +109,26 @@ ALTER DATABASE <replaceable class="PARAMETER">name</replaceable> RESET ALL
      </varlistentry>
 
      <varlistentry>
+       <term><replaceable class="parameter">istemplate</replaceable></term>
+       <listitem>
+        <para>
+         If true, then this database can be cloned by any user with CREATEDB
+         privileges; if false, then only superusers or the owner of the
+         database can clone it.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term><replaceable class="parameter">allowconn</replaceable></term>
+       <listitem>
+        <para>
+         If false then no one can connect to this database.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
       <term><replaceable class="parameter">connlimit</replaceable></term>
       <listitem>
        <para>
index 5af898152079e9e621c3a916ed6c214e03a316d0..9711b1f98e3ebb524b918843099e2e698a112f54 100644 (file)
@@ -28,6 +28,8 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
            [ LC_COLLATE [=] <replaceable class="parameter">lc_collate</replaceable> ]
            [ LC_CTYPE [=] <replaceable class="parameter">lc_ctype</replaceable> ]
            [ TABLESPACE [=] <replaceable class="parameter">tablespace_name</replaceable> ]
+           [ IS_TEMPLATE [=] <replaceable class="parameter">istemplate</replaceable> ]
+           [ ALLOW_CONNECTIONS [=] <replaceable class="parameter">allowconn</replaceable> ]
            [ CONNECTION LIMIT [=] <replaceable class="parameter">connlimit</replaceable> ] ]
 </synopsis>
  </refsynopsisdiv>
@@ -148,6 +150,28 @@ CREATE DATABASE <replaceable class="PARAMETER">name</replaceable>
      </varlistentry>
 
      <varlistentry>
+       <term><replaceable class="parameter">istemplate</replaceable></term>
+       <listitem>
+        <para>
+         If true, then this database can be cloned by any user with CREATEDB
+         privileges; if false (the default), then only superusers or the owner
+         of the database can clone it.
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
+       <term><replaceable class="parameter">allowconn</replaceable></term>
+       <listitem>
+        <para>
+         If false then no one can connect to this database.  The default is
+         true, allowing connections (except as restricted by other mechanisms,
+         such as <literal>GRANT</>/<literal>REVOKE CONNECT</>).
+        </para>
+       </listitem>
+      </varlistentry>
+      <varlistentry>
       <term><replaceable class="parameter">connlimit</replaceable></term>
       <listitem>
        <para>
index dd92aff89dc7848aeca8a06142f46d8372304630..f480be884506efe602ce3b78d8f95c4a4e17021d 100644 (file)
@@ -123,6 +123,8 @@ createdb(const CreatedbStmt *stmt)
    DefElem    *dencoding = NULL;
    DefElem    *dcollate = NULL;
    DefElem    *dctype = NULL;
+   DefElem    *distemplate = NULL;
+   DefElem    *dallowconnections = NULL;
    DefElem    *dconnlimit = NULL;
    char       *dbname = stmt->dbname;
    char       *dbowner = NULL;
@@ -131,6 +133,8 @@ createdb(const CreatedbStmt *stmt)
    char       *dbctype = NULL;
    char       *canonname;
    int         encoding = -1;
+   bool        dbistemplate = false;
+   bool        dballowconnections = true;
    int         dbconnlimit = -1;
    int         notherbackends;
    int         npreparedxacts;
@@ -189,6 +193,22 @@ createdb(const CreatedbStmt *stmt)
                         errmsg("conflicting or redundant options")));
            dctype = defel;
        }
+       else if (strcmp(defel->defname, "is_template") == 0)
+       {
+           if (distemplate)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           distemplate = defel;
+       }
+       else if (strcmp(defel->defname, "allow_connections") == 0)
+       {
+           if (dallowconnections)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           dallowconnections = defel;
+       }
        else if (strcmp(defel->defname, "connection_limit") == 0)
        {
            if (dconnlimit)
@@ -244,7 +264,10 @@ createdb(const CreatedbStmt *stmt)
        dbcollate = defGetString(dcollate);
    if (dctype && dctype->arg)
        dbctype = defGetString(dctype);
-
+   if (distemplate && distemplate->arg)
+       dbistemplate = defGetBoolean(distemplate);
+   if (dallowconnections && dallowconnections->arg)
+       dballowconnections = defGetBoolean(dallowconnections);
    if (dconnlimit && dconnlimit->arg)
    {
        dbconnlimit = defGetInt32(dconnlimit);
@@ -487,8 +510,8 @@ createdb(const CreatedbStmt *stmt)
        DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
    new_record[Anum_pg_database_datctype - 1] =
        DirectFunctionCall1(namein, CStringGetDatum(dbctype));
-   new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
-   new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
+   new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
+   new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
    new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
    new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
    new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
@@ -1328,7 +1351,11 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
    ScanKeyData scankey;
    SysScanDesc scan;
    ListCell   *option;
-   int         connlimit = -1;
+   bool        dbistemplate = false;
+   bool        dballowconnections = true;
+   int         dbconnlimit = -1;
+   DefElem    *distemplate = NULL;
+   DefElem    *dallowconnections = NULL;
    DefElem    *dconnlimit = NULL;
    DefElem    *dtablespace = NULL;
    Datum       new_record[Natts_pg_database];
@@ -1340,7 +1367,23 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
    {
        DefElem    *defel = (DefElem *) lfirst(option);
 
-       if (strcmp(defel->defname, "connection_limit") == 0)
+       if (strcmp(defel->defname, "is_template") == 0)
+       {
+           if (distemplate)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           distemplate = defel;
+       }
+       else if (strcmp(defel->defname, "allow_connections") == 0)
+       {
+           if (dallowconnections)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           dallowconnections = defel;
+       }
+       else if (strcmp(defel->defname, "connection_limit") == 0)
        {
            if (dconnlimit)
                ereport(ERROR,
@@ -1380,13 +1423,17 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
        return InvalidOid;
    }
 
+   if (distemplate && distemplate->arg)
+       dbistemplate = defGetBoolean(distemplate);
+   if (dallowconnections && dallowconnections->arg)
+       dballowconnections = defGetBoolean(dallowconnections);
    if (dconnlimit && dconnlimit->arg)
    {
-       connlimit = defGetInt32(dconnlimit);
-       if (connlimit < -1)
+       dbconnlimit = defGetInt32(dconnlimit);
+       if (dbconnlimit < -1)
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                    errmsg("invalid connection limit: %d", connlimit)));
+                    errmsg("invalid connection limit: %d", dbconnlimit)));
    }
 
    /*
@@ -1413,6 +1460,17 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
        aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
                       stmt->dbname);
 
+   /*
+    * In order to avoid getting locked out and having to go through
+    * standalone mode, we refuse to disallow connections to the database
+    * we're currently connected to.  Lockout can still happen with concurrent
+    * sessions but the likeliness of that is not high enough to worry about.
+    */
+   if (!dballowconnections && dboid == MyDatabaseId)
+       ereport(ERROR,
+               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                errmsg("cannot disallow connections for current database")));
+
    /*
     * Build an updated tuple, perusing the information just obtained
     */
@@ -1420,9 +1478,19 @@ AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
    MemSet(new_record_nulls, false, sizeof(new_record_nulls));
    MemSet(new_record_repl, false, sizeof(new_record_repl));
 
+   if (distemplate)
+   {
+       new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
+       new_record_repl[Anum_pg_database_datistemplate - 1] = true;
+   }
+   if (dallowconnections)
+   {
+       new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
+       new_record_repl[Anum_pg_database_datallowconn - 1] = true;
+   }
    if (dconnlimit)
    {
-       new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
+       new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
        new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
    }
 
index 5228f1342224efe830e309ec60633ec2c4b1c57f..a25965ce4c9c6338dd8e03895cb71e6e8e1f66cd 100644 (file)
@@ -2288,11 +2288,7 @@ make_template0(void)
    PG_CMD_DECL;
    const char **line;
    static const char *template0_setup[] = {
-       "CREATE DATABASE template0;\n",
-       "UPDATE pg_database SET "
-       "   datistemplate = 't', "
-       "   datallowconn = 'f' "
-       "    WHERE datname = 'template0';\n",
+       "CREATE DATABASE template0 IS_TEMPLATE = true ALLOW_CONNECTIONS = false;\n",
 
        /*
         * We use the OID of template0 to determine lastsysoid
index 0cc4329b1a1add56042198355c6d1ab1a8f96311..9dec6f3b141fbafd18299e6ca76b487759e8e5ca 100644 (file)
@@ -1374,19 +1374,15 @@ dumpCreateDB(PGconn *conn)
                appendPQExpBuffer(buf, " TABLESPACE = %s",
                                  fmtId(dbtablespace));
 
+           if (strcmp(dbistemplate, "t") == 0)
+               appendPQExpBuffer(buf, " IS_TEMPLATE = true");
+
            if (strcmp(dbconnlimit, "-1") != 0)
                appendPQExpBuffer(buf, " CONNECTION LIMIT = %s",
                                  dbconnlimit);
 
            appendPQExpBufferStr(buf, ";\n");
 
-           if (strcmp(dbistemplate, "t") == 0)
-           {
-               appendPQExpBufferStr(buf, "UPDATE pg_catalog.pg_database SET datistemplate = 't' WHERE datname = ");
-               appendStringLiteralConn(buf, dbname, conn);
-               appendPQExpBufferStr(buf, ";\n");
-           }
-
            if (binary_upgrade)
            {
                appendPQExpBufferStr(buf, "-- For binary upgrade, set datfrozenxid.\n");
index be5c3c5f4500bc97d1158a60326021b4c347b2cc..bab03572352d1e5ba9b2b78a298de95065d929c4 100644 (file)
@@ -1021,7 +1021,8 @@ psql_completion(const char *text, int start, int end)
             pg_strcasecmp(prev2_wd, "DATABASE") == 0)
    {
        static const char *const list_ALTERDATABASE[] =
-       {"RESET", "SET", "OWNER TO", "RENAME TO", "CONNECTION LIMIT", NULL};
+       {"RESET", "SET", "OWNER TO", "RENAME TO", "IS_TEMPLATE",
+       "ALLOW_CONNECTIONS", "CONNECTION LIMIT", NULL};
 
        COMPLETE_WITH_LIST(list_ALTERDATABASE);
    }
@@ -2111,8 +2112,8 @@ psql_completion(const char *text, int start, int end)
             pg_strcasecmp(prev2_wd, "DATABASE") == 0)
    {
        static const char *const list_DATABASE[] =
-       {"OWNER", "TEMPLATE", "ENCODING", "TABLESPACE", "CONNECTION LIMIT",
-       NULL};
+       {"OWNER", "TEMPLATE", "ENCODING", "TABLESPACE", "IS_TEMPLATE",
+       "ALLOW_CONNECTIONS", "CONNECTION LIMIT", NULL};
 
        COMPLETE_WITH_LIST(list_DATABASE);
    }