Dump public schema ownership and security labels.
authorNoah Misch <noah@leadboat.com>
Tue, 29 Jun 2021 01:34:55 +0000 (18:34 -0700)
committerNoah Misch <noah@leadboat.com>
Tue, 29 Jun 2021 01:34:55 +0000 (18:34 -0700)
As a side effect, this corrects dumps of public schema ACLs in databases
where the DBA dropped and recreated that schema.

Reviewed by Asif Rehman.

Discussion: https://postgr.es/m/20201229134924.GA1431748@rfd.leadboat.com

src/bin/pg_dump/dumputils.c
src/bin/pg_dump/dumputils.h
src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/pg_dump/t/002_pg_dump.pl
src/test/modules/test_pg_dump/t/001_base.pl

index 60d306e7c3a32c3de1ca0ba2f94be7c1c2e0fd74..ea67e52a3f47ee1a9b3db3d1c1bb734d7e03576b 100644 (file)
@@ -725,6 +725,7 @@ void
 buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
                PQExpBuffer init_acl_subquery, PQExpBuffer init_racl_subquery,
                const char *acl_column, const char *acl_owner,
+               const char *initprivs_expr,
                const char *obj_kind, bool binary_upgrade)
 {
    /*
@@ -765,23 +766,25 @@ buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
                      "WITH ORDINALITY AS perm(acl,row_n) "
                      "WHERE NOT EXISTS ( "
                      "SELECT 1 FROM "
-                     "pg_catalog.unnest(coalesce(pip.initprivs,pg_catalog.acldefault(%s,%s))) "
+                     "pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
                      "AS init(init_acl) WHERE acl = init_acl)) as foo)",
                      acl_column,
                      obj_kind,
                      acl_owner,
+                     initprivs_expr,
                      obj_kind,
                      acl_owner);
 
    printfPQExpBuffer(racl_subquery,
                      "(SELECT pg_catalog.array_agg(acl ORDER BY row_n) FROM "
                      "(SELECT acl, row_n FROM "
-                     "pg_catalog.unnest(coalesce(pip.initprivs,pg_catalog.acldefault(%s,%s))) "
+                     "pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
                      "WITH ORDINALITY AS initp(acl,row_n) "
                      "WHERE NOT EXISTS ( "
                      "SELECT 1 FROM "
                      "pg_catalog.unnest(coalesce(%s,pg_catalog.acldefault(%s,%s))) "
                      "AS permp(orig_acl) WHERE acl = orig_acl)) as foo)",
+                     initprivs_expr,
                      obj_kind,
                      acl_owner,
                      acl_column,
@@ -807,12 +810,13 @@ buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
        printfPQExpBuffer(init_acl_subquery,
                          "CASE WHEN privtype = 'e' THEN "
                          "(SELECT pg_catalog.array_agg(acl ORDER BY row_n) FROM "
-                         "(SELECT acl, row_n FROM pg_catalog.unnest(pip.initprivs) "
+                         "(SELECT acl, row_n FROM pg_catalog.unnest(%s) "
                          "WITH ORDINALITY AS initp(acl,row_n) "
                          "WHERE NOT EXISTS ( "
                          "SELECT 1 FROM "
                          "pg_catalog.unnest(pg_catalog.acldefault(%s,%s)) "
                          "AS privm(orig_acl) WHERE acl = orig_acl)) as foo) END",
+                         initprivs_expr,
                          obj_kind,
                          acl_owner);
 
@@ -823,10 +827,11 @@ buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
                          "pg_catalog.unnest(pg_catalog.acldefault(%s,%s)) "
                          "WITH ORDINALITY AS privp(acl,row_n) "
                          "WHERE NOT EXISTS ( "
-                         "SELECT 1 FROM pg_catalog.unnest(pip.initprivs) "
+                         "SELECT 1 FROM pg_catalog.unnest(%s) "
                          "AS initp(init_acl) WHERE acl = init_acl)) as foo) END",
                          obj_kind,
-                         acl_owner);
+                         acl_owner,
+                         initprivs_expr);
    }
    else
    {
index 6e97da7487ef192f75db8671cce6ef87a5ac1bb5..f5465f19aee35d34cad43e88a224dd30d367b1d4 100644 (file)
@@ -54,6 +54,7 @@ extern void emitShSecLabels(PGconn *conn, PGresult *res,
 extern void buildACLQueries(PQExpBuffer acl_subquery, PQExpBuffer racl_subquery,
                            PQExpBuffer init_acl_subquery, PQExpBuffer init_racl_subquery,
                            const char *acl_column, const char *acl_owner,
+                           const char *initprivs_expr,
                            const char *obj_kind, bool binary_upgrade);
 
 extern bool variable_is_guc_list_quote(const char *name);
index 24cc09625581704481a9378e8dbb09f00338f559..ee06dc68221eff65439faae76dbe4acb3dae33f7 100644 (file)
@@ -3538,10 +3538,12 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
     * Actually print the definition.
     *
     * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
-    * versions put into CREATE SCHEMA.  We have to do this when --no-owner
-    * mode is selected.  This is ugly, but I see no other good way ...
+    * versions put into CREATE SCHEMA.  Don't mutate the variant for schema
+    * "public" that is a comment.  We have to do this when --no-owner mode is
+    * selected.  This is ugly, but I see no other good way ...
     */
-   if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
+   if (ropt->noOwner &&
+       strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
    {
        ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
    }
@@ -3553,11 +3555,16 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 
    /*
     * If we aren't using SET SESSION AUTH to determine ownership, we must
-    * instead issue an ALTER OWNER command.  We assume that anything without
-    * a DROP command is not a separately ownable object.  All the categories
-    * with DROP commands must appear in one list or the other.
+    * instead issue an ALTER OWNER command.  Schema "public" is special; when
+    * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
+    * when using SET SESSION for all other objects.  We assume that anything
+    * without a DROP command is not a separately ownable object.  All the
+    * categories with DROP commands must appear in one list or the other.
     */
-   if (!ropt->noOwner && !ropt->use_setsessauth &&
+   if (!ropt->noOwner &&
+       (!ropt->use_setsessauth ||
+        (strcmp(te->desc, "SCHEMA") == 0 &&
+         strncmp(te->defn, "--", 2) == 0)) &&
        te->owner && strlen(te->owner) > 0 &&
        te->dropStmt && strlen(te->dropStmt) > 0)
    {
index 8f53cc7c3b87132ef271c2304bbe65fe24a74938..3078d9af11a1ff728f4ff27d853f498e08e0a6fe 100644 (file)
@@ -44,6 +44,7 @@
 #include "catalog/pg_aggregate_d.h"
 #include "catalog/pg_am_d.h"
 #include "catalog/pg_attribute_d.h"
+#include "catalog/pg_authid_d.h"
 #include "catalog/pg_cast_d.h"
 #include "catalog/pg_class_d.h"
 #include "catalog/pg_default_acl_d.h"
@@ -1598,6 +1599,13 @@ checkExtensionMembership(DumpableObject *dobj, Archive *fout)
 static void
 selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout)
 {
+   /*
+    * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
+    * and (for --clean) a DROP SCHEMA statement.  (In the absence of
+    * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
+    */
+   nsinfo->create = true;
+
    /*
     * If specific tables are being dumped, do not dump any complete
     * namespaces. If specific namespaces are being dumped, dump just those
@@ -1633,10 +1641,15 @@ selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout)
         * no-mans-land between being a system object and a user object.  We
         * don't want to dump creation or comment commands for it, because
         * that complicates matters for non-superuser use of pg_dump.  But we
-        * should dump any ACL changes that have occurred for it, and of
-        * course we should dump contained objects.
+        * should dump any ownership changes, security labels, and ACL
+        * changes, and of course we should dump contained objects.  pg_dump
+        * ties ownership to DUMP_COMPONENT_DEFINITION.  Hence, unless the
+        * owner is the default, dump a "definition" bearing just a comment.
         */
-       nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
+       nsinfo->create = false;
+       nsinfo->dobj.dump = DUMP_COMPONENT_ALL & ~DUMP_COMPONENT_COMMENT;
+       if (nsinfo->nspowner == BOOTSTRAP_SUPERUSERID)
+           nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
        nsinfo->dobj.dump_contains = DUMP_COMPONENT_ALL;
    }
    else
@@ -3428,8 +3441,8 @@ getBlobs(Archive *fout)
        PQExpBuffer init_racl_subquery = createPQExpBuffer();
 
        buildACLQueries(acl_subquery, racl_subquery, init_acl_subquery,
-                       init_racl_subquery, "l.lomacl", "l.lomowner", "'L'",
-                       dopt->binary_upgrade);
+                       init_racl_subquery, "l.lomacl", "l.lomowner",
+                       "pip.initprivs", "'L'", dopt->binary_upgrade);
 
        appendPQExpBuffer(blobQry,
                          "SELECT l.oid, (%s l.lomowner) AS rolname, "
@@ -4830,6 +4843,7 @@ getNamespaces(Archive *fout, int *numNamespaces)
    int         i_tableoid;
    int         i_oid;
    int         i_nspname;
+   int         i_nspowner;
    int         i_rolname;
    int         i_nspacl;
    int         i_rnspacl;
@@ -4849,11 +4863,27 @@ getNamespaces(Archive *fout, int *numNamespaces)
        PQExpBuffer init_acl_subquery = createPQExpBuffer();
        PQExpBuffer init_racl_subquery = createPQExpBuffer();
 
+       /*
+        * Bypass pg_init_privs.initprivs for the public schema.  Dropping and
+        * recreating the schema detaches it from its pg_init_privs row, but
+        * an empty destination database starts with this ACL nonetheless.
+        * Also, we support dump/reload of public schema ownership changes.
+        * ALTER SCHEMA OWNER filters nspacl through aclnewowner(), but
+        * initprivs continues to reflect the initial owner (the bootstrap
+        * superuser).  Hence, synthesize the value that nspacl will have
+        * after the restore's ALTER SCHEMA OWNER.
+        */
        buildACLQueries(acl_subquery, racl_subquery, init_acl_subquery,
-                       init_racl_subquery, "n.nspacl", "n.nspowner", "'n'",
-                       dopt->binary_upgrade);
+                       init_racl_subquery, "n.nspacl", "n.nspowner",
+                       "CASE WHEN n.nspname = 'public' THEN array["
+                       "  format('%s=UC/%s', "
+                       "         n.nspowner::regrole, n.nspowner::regrole),"
+                       "  format('=UC/%s', n.nspowner::regrole)]::aclitem[] "
+                       "ELSE pip.initprivs END",
+                       "'n'", dopt->binary_upgrade);
 
        appendPQExpBuffer(query, "SELECT n.tableoid, n.oid, n.nspname, "
+                         "n.nspowner, "
                          "(%s nspowner) AS rolname, "
                          "%s as nspacl, "
                          "%s as rnspacl, "
@@ -4878,7 +4908,7 @@ getNamespaces(Archive *fout, int *numNamespaces)
        destroyPQExpBuffer(init_racl_subquery);
    }
    else
-       appendPQExpBuffer(query, "SELECT tableoid, oid, nspname, "
+       appendPQExpBuffer(query, "SELECT tableoid, oid, nspname, nspowner, "
                          "(%s nspowner) AS rolname, "
                          "nspacl, NULL as rnspacl, "
                          "NULL AS initnspacl, NULL as initrnspacl "
@@ -4894,6 +4924,7 @@ getNamespaces(Archive *fout, int *numNamespaces)
    i_tableoid = PQfnumber(res, "tableoid");
    i_oid = PQfnumber(res, "oid");
    i_nspname = PQfnumber(res, "nspname");
+   i_nspowner = PQfnumber(res, "nspowner");
    i_rolname = PQfnumber(res, "rolname");
    i_nspacl = PQfnumber(res, "nspacl");
    i_rnspacl = PQfnumber(res, "rnspacl");
@@ -4907,6 +4938,7 @@ getNamespaces(Archive *fout, int *numNamespaces)
        nsinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
        AssignDumpId(&nsinfo[i].dobj);
        nsinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_nspname));
+       nsinfo[i].nspowner = atooid(PQgetvalue(res, i, i_nspowner));
        nsinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
        nsinfo[i].nspacl = pg_strdup(PQgetvalue(res, i, i_nspacl));
        nsinfo[i].rnspacl = pg_strdup(PQgetvalue(res, i, i_rnspacl));
@@ -5098,8 +5130,8 @@ getTypes(Archive *fout, int *numTypes)
        PQExpBuffer initracl_subquery = createPQExpBuffer();
 
        buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
-                       initracl_subquery, "t.typacl", "t.typowner", "'T'",
-                       dopt->binary_upgrade);
+                       initracl_subquery, "t.typacl", "t.typowner",
+                       "pip.initprivs", "'T'", dopt->binary_upgrade);
 
        appendPQExpBuffer(query, "SELECT t.tableoid, t.oid, t.typname, "
                          "t.typnamespace, "
@@ -5800,8 +5832,8 @@ getAggregates(Archive *fout, int *numAggs)
        const char *agg_check;
 
        buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
-                       initracl_subquery, "p.proacl", "p.proowner", "'f'",
-                       dopt->binary_upgrade);
+                       initracl_subquery, "p.proacl", "p.proowner",
+                       "pip.initprivs", "'f'", dopt->binary_upgrade);
 
        agg_check = (fout->remoteVersion >= 110000 ? "p.prokind = 'a'"
                     : "p.proisagg");
@@ -6013,8 +6045,8 @@ getFuncs(Archive *fout, int *numFuncs)
        const char *not_agg_check;
 
        buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
-                       initracl_subquery, "p.proacl", "p.proowner", "'f'",
-                       dopt->binary_upgrade);
+                       initracl_subquery, "p.proacl", "p.proowner",
+                       "pip.initprivs", "'f'", dopt->binary_upgrade);
 
        not_agg_check = (fout->remoteVersion >= 110000 ? "p.prokind <> 'a'"
                         : "NOT p.proisagg");
@@ -6310,13 +6342,14 @@ getTables(Archive *fout, int *numTables)
 
        buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
                        initracl_subquery, "c.relacl", "c.relowner",
+                       "pip.initprivs",
                        "CASE WHEN c.relkind = " CppAsString2(RELKIND_SEQUENCE)
                        " THEN 's' ELSE 'r' END::\"char\"",
                        dopt->binary_upgrade);
 
        buildACLQueries(attacl_subquery, attracl_subquery, attinitacl_subquery,
-                       attinitracl_subquery, "at.attacl", "c.relowner", "'c'",
-                       dopt->binary_upgrade);
+                       attinitracl_subquery, "at.attacl", "c.relowner",
+                       "pip.initprivs", "'c'", dopt->binary_upgrade);
 
        appendPQExpBuffer(query,
                          "SELECT c.tableoid, c.oid, c.relname, "
@@ -8241,8 +8274,8 @@ getProcLangs(Archive *fout, int *numProcLangs)
        PQExpBuffer initracl_subquery = createPQExpBuffer();
 
        buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
-                       initracl_subquery, "l.lanacl", "l.lanowner", "'l'",
-                       dopt->binary_upgrade);
+                       initracl_subquery, "l.lanacl", "l.lanowner",
+                       "pip.initprivs", "'l'", dopt->binary_upgrade);
 
        /* pg_language has a laninline column */
        appendPQExpBuffer(query, "SELECT l.tableoid, l.oid, "
@@ -9432,8 +9465,8 @@ getForeignDataWrappers(Archive *fout, int *numForeignDataWrappers)
        PQExpBuffer initracl_subquery = createPQExpBuffer();
 
        buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
-                       initracl_subquery, "f.fdwacl", "f.fdwowner", "'F'",
-                       dopt->binary_upgrade);
+                       initracl_subquery, "f.fdwacl", "f.fdwowner",
+                       "pip.initprivs", "'F'", dopt->binary_upgrade);
 
        appendPQExpBuffer(query, "SELECT f.tableoid, f.oid, f.fdwname, "
                          "(%s f.fdwowner) AS rolname, "
@@ -9599,8 +9632,8 @@ getForeignServers(Archive *fout, int *numForeignServers)
        PQExpBuffer initracl_subquery = createPQExpBuffer();
 
        buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
-                       initracl_subquery, "f.srvacl", "f.srvowner", "'S'",
-                       dopt->binary_upgrade);
+                       initracl_subquery, "f.srvacl", "f.srvowner",
+                       "pip.initprivs", "'S'", dopt->binary_upgrade);
 
        appendPQExpBuffer(query, "SELECT f.tableoid, f.oid, f.srvname, "
                          "(%s f.srvowner) AS rolname, "
@@ -9746,6 +9779,7 @@ getDefaultACLs(Archive *fout, int *numDefaultACLs)
 
        buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
                        initracl_subquery, "defaclacl", "defaclrole",
+                       "pip.initprivs",
                        "CASE WHEN defaclobjtype = 'S' THEN 's' ELSE defaclobjtype END::\"char\"",
                        dopt->binary_upgrade);
 
@@ -10363,9 +10397,19 @@ dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo)
 
    qnspname = pg_strdup(fmtId(nspinfo->dobj.name));
 
-   appendPQExpBuffer(delq, "DROP SCHEMA %s;\n", qnspname);
-
-   appendPQExpBuffer(q, "CREATE SCHEMA %s;\n", qnspname);
+   if (nspinfo->create)
+   {
+       appendPQExpBuffer(delq, "DROP SCHEMA %s;\n", qnspname);
+       appendPQExpBuffer(q, "CREATE SCHEMA %s;\n", qnspname);
+   }
+   else
+   {
+       /* see selectDumpableNamespace() */
+       appendPQExpBufferStr(delq,
+                            "-- *not* dropping schema, since initdb creates it\n");
+       appendPQExpBufferStr(q,
+                            "-- *not* creating schema, since initdb creates it\n");
+   }
 
    if (dopt->binary_upgrade)
        binary_upgrade_extension_member(q, &nspinfo->dobj,
@@ -15556,8 +15600,8 @@ dumpTable(Archive *fout, const TableInfo *tbinfo)
            PQExpBuffer initracl_subquery = createPQExpBuffer();
 
            buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
-                           initracl_subquery, "at.attacl", "c.relowner", "'c'",
-                           dopt->binary_upgrade);
+                           initracl_subquery, "at.attacl", "c.relowner",
+                           "pip.initprivs", "'c'", dopt->binary_upgrade);
 
            appendPQExpBuffer(query,
                              "SELECT at.attname, "
index 49e1b0a09c4eab05940a2ec08ac5a81f063765e7..ba9bc6ddd210a3e68d8f8f2ed1e1ff0566f459b7 100644 (file)
@@ -142,6 +142,8 @@ typedef struct _dumpableObject
 typedef struct _namespaceInfo
 {
    DumpableObject dobj;
+   bool        create;         /* CREATE SCHEMA, or just set owner? */
+   Oid         nspowner;
    char       *rolname;        /* name of owner, or empty string */
    char       *nspacl;
    char       *rnspacl;
index 244507c97c180706f96df6e0cdf6a36789d3d6e4..bd6314c2893493da8aa1c1621e28112c0b721393 100644 (file)
@@ -128,6 +128,14 @@ my %pgdump_runs = (
            'regress_pg_dump_test',
        ],
    },
+   defaults_public_owner => {
+       database => 'regress_public_owner',
+       dump_cmd => [
+           'pg_dump', '--no-sync', '-f',
+           "$tempdir/defaults_public_owner.sql",
+           'regress_public_owner',
+       ],
+   },
 
    # Do not use --no-sync to give test coverage for data sync.
    defaults_custom_format => {
@@ -620,6 +628,24 @@ my %tests = (
        unlike => { no_owner => 1, },
    },
 
+   'ALTER SCHEMA public OWNER TO' => {
+       # see test "REVOKE CREATE ON SCHEMA public" for causative create_sql
+       regexp => qr/^ALTER SCHEMA public OWNER TO .+;/m,
+       like   => {
+           %full_runs, section_pre_data => 1,
+       },
+       unlike => { no_owner => 1, },
+   },
+
+   'ALTER SCHEMA public OWNER TO (w/o ACL changes)' => {
+       database     => 'regress_public_owner',
+       create_order => 100,
+       create_sql =>
+         'ALTER SCHEMA public OWNER TO "regress_quoted  \"" role";',
+       regexp => qr/^(GRANT|REVOKE)/m,
+       unlike => { defaults_public_owner => 1 },
+   },
+
    'ALTER SEQUENCE test_table_col1_seq' => {
        regexp => qr/^
            \QALTER SEQUENCE dump_test.test_table_col1_seq OWNED BY dump_test.test_table.col1;\E
@@ -955,6 +981,13 @@ my %tests = (
        like => {},
    },
 
+   'COMMENT ON SCHEMA public' => {
+       regexp => qr/^COMMENT ON SCHEMA public IS .+;/m,
+
+       # this shouldn't ever get emitted
+       like => {},
+   },
+
    'COMMENT ON TABLE dump_test.test_table' => {
        create_order => 36,
        create_sql   => 'COMMENT ON TABLE dump_test.test_table
@@ -1385,6 +1418,18 @@ my %tests = (
        },
    },
 
+   'CREATE ROLE regress_quoted...' => {
+       create_order => 1,
+       create_sql   => 'CREATE ROLE "regress_quoted  \"" role";',
+       regexp       => qr/^\QCREATE ROLE "regress_quoted  \"" role";\E/m,
+       like         => {
+           pg_dumpall_dbprivs       => 1,
+           pg_dumpall_exclude       => 1,
+           pg_dumpall_globals       => 1,
+           pg_dumpall_globals_clean => 1,
+       },
+   },
+
    'CREATE ACCESS METHOD gist2' => {
        create_order => 52,
        create_sql =>
@@ -3333,6 +3378,23 @@ my %tests = (
        unlike => { no_privs => 1, },
    },
 
+   # With the exception of the public schema, we don't dump ownership changes
+   # for objects originating at initdb.  Hence, any GRANT or REVOKE affecting
+   # owner privileges for those objects should reference the bootstrap
+   # superuser, not the dump-time owner.
+   'REVOKE EXECUTE ON FUNCTION pg_stat_reset FROM regress_dump_test_role' =>
+     {
+       create_order => 15,
+       create_sql   => '
+           ALTER FUNCTION pg_stat_reset OWNER TO regress_dump_test_role;
+           REVOKE EXECUTE ON FUNCTION pg_stat_reset
+             FROM regress_dump_test_role;',
+       regexp => qr/^[^-].*pg_stat_reset.* regress_dump_test_role/m,
+
+       # this shouldn't ever get emitted
+       like => {},
+     },
+
    'REVOKE SELECT ON TABLE pg_proc FROM public' => {
        create_order => 45,
        create_sql   => 'REVOKE SELECT ON TABLE pg_proc FROM public;',
@@ -3344,9 +3406,13 @@ my %tests = (
 
    'REVOKE CREATE ON SCHEMA public FROM public' => {
        create_order => 16,
-       create_sql   => 'REVOKE CREATE ON SCHEMA public FROM public;',
-       regexp       => qr/^
-           \QREVOKE ALL ON SCHEMA public FROM PUBLIC;\E
+       create_sql   => '
+           REVOKE CREATE ON SCHEMA public FROM public;
+           ALTER SCHEMA public OWNER TO "regress_quoted  \"" role";
+           REVOKE ALL ON SCHEMA public FROM "regress_quoted  \"" role";',
+       regexp => qr/^
+           \QREVOKE ALL ON SCHEMA public FROM "regress_quoted  \"" role";\E
+           \n\QREVOKE ALL ON SCHEMA public FROM PUBLIC;\E
            \n\QGRANT USAGE ON SCHEMA public TO PUBLIC;\E
            /xm,
        like => { %full_runs, section_pre_data => 1, },
@@ -3451,8 +3517,9 @@ if ($collation_check_stderr !~ /ERROR: /)
 # Determine whether build supports LZ4.
 my $supports_lz4 = check_pg_config("#define HAVE_LIBLZ4 1");
 
-# Create a second database for certain tests to work against
+# Create additional databases for mutations of schema public
 $node->psql('postgres', 'create database regress_pg_dump_test;');
+$node->psql('postgres', 'create database regress_public_owner;');
 
 # Start with number of command_fails_like()*2 tests below (each
 # command_fails_like is actually 2 tests)
index 4baca365c64982b56f471c88278e21eb29e1a455..8511da5169ba08fa958655ce3b5c420f89eb5b0b 100644 (file)
@@ -318,6 +318,14 @@ my %tests = (
        like         => { pg_dumpall_globals => 1, },
    },
 
+   'CREATE SCHEMA public' => {
+       regexp => qr/^CREATE SCHEMA public;/m,
+       like   => {
+           extension_schema                  => 1,
+           without_extension_explicit_schema => 1,
+       },
+   },
+
    'CREATE SEQUENCE regress_pg_dump_table_col1_seq' => {
        regexp => qr/^
                     \QCREATE SEQUENCE public.regress_pg_dump_table_col1_seq\E