Speed up ruleutils' name de-duplication code, and fix overlength-name case.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 16 Nov 2015 18:45:17 +0000 (13:45 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 16 Nov 2015 18:45:17 +0000 (13:45 -0500)
Since commit 11e131854f8231a21613f834c40fe9d046926387, ruleutils.c has
attempted to ensure that each RTE in a query or plan tree has a unique
alias name.  However, the code that was added for this could be quite slow,
even as bad as O(N^3) if N identical RTE names must be replaced, as noted
by Jeff Janes.  Improve matters by building a transient hash table within
set_rtable_names.  The hash table in itself reduces the cost of detecting a
duplicate from O(N) to O(1), and we can save another factor of N by storing
the number of de-duplicated names already created for each entry, so that
we don't have to re-try names already created.  This way is probably a bit
slower overall for small range tables, but almost by definition, such cases
should not be a performance problem.

In principle the same problem applies to the column-name-de-duplication
code; but in practice that seems to be less of a problem, first because
N is limited since we don't support extremely wide tables, and second
because duplicate column names within an RTE are fairly rare, so that in
practice the cost is more like O(N^2) not O(N^3).  It would be very much
messier to fix the column-name code, so for now I've left that alone.

An independent problem in the same area was that the de-duplication code
paid no attention to the identifier length limit, and would happily produce
identifiers that were longer than NAMEDATALEN and wouldn't be unique after
truncation to NAMEDATALEN.  This could result in dump/reload failures, or
perhaps even views that silently behaved differently than before.  We can
fix that by shortening the base name as needed.  Fix it for both the
relation and column name cases.

In passing, check for interrupts in set_rtable_names, just in case it's
still slow enough to be an issue.

Back-patch to 9.3 where this code was introduced.

src/backend/utils/adt/ruleutils.c
src/test/regress/expected/create_view.out
src/test/regress/sql/create_view.sql

index 3388f7c17ea92e0326859536c980e51cba7c150d..771b14b48acd820fc99989e3b0a77d799c9f9a41 100644 (file)
@@ -38,6 +38,7 @@
 #include "commands/tablespace.h"
 #include "executor/spi.h"
 #include "funcapi.h"
+#include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
@@ -55,6 +56,7 @@
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/hsearch.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/ruleutils.h"
@@ -267,6 +269,15 @@ typedef struct
 #define deparse_columns_fetch(rangetable_index, dpns) \
    ((deparse_columns *) list_nth((dpns)->rtable_columns, (rangetable_index)-1))
 
+/*
+ * Entry in set_rtable_names' hash table
+ */
+typedef struct
+{
+   char        name[NAMEDATALEN];      /* Hash key --- must be first */
+   int         counter;        /* Largest addition used so far for name */
+} NameHashEntry;
+
 
 /* ----------
  * Global data
@@ -312,8 +323,6 @@ static void print_function_rettype(StringInfo buf, HeapTuple proctup);
 static void print_function_trftypes(StringInfo buf, HeapTuple proctup);
 static void set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
                 Bitmapset *rels_used);
-static bool refname_is_unique(char *refname, deparse_namespace *dpns,
-                 List *parent_namespaces);
 static void set_deparse_for_query(deparse_namespace *dpns, Query *query,
                      List *parent_namespaces);
 static void set_simple_column_names(deparse_namespace *dpns);
@@ -2676,15 +2685,61 @@ static void
 set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
                 Bitmapset *rels_used)
 {
+   HASHCTL     hash_ctl;
+   HTAB       *names_hash;
+   NameHashEntry *hentry;
+   bool        found;
+   int         rtindex;
    ListCell   *lc;
-   int         rtindex = 1;
 
    dpns->rtable_names = NIL;
+   /* nothing more to do if empty rtable */
+   if (dpns->rtable == NIL)
+       return;
+
+   /*
+    * We use a hash table to hold known names, so that this process is O(N)
+    * not O(N^2) for N names.
+    */
+   MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+   hash_ctl.keysize = NAMEDATALEN;
+   hash_ctl.entrysize = sizeof(NameHashEntry);
+   hash_ctl.hcxt = CurrentMemoryContext;
+   names_hash = hash_create("set_rtable_names names",
+                            list_length(dpns->rtable),
+                            &hash_ctl,
+                            HASH_ELEM | HASH_CONTEXT);
+   /* Preload the hash table with names appearing in parent_namespaces */
+   foreach(lc, parent_namespaces)
+   {
+       deparse_namespace *olddpns = (deparse_namespace *) lfirst(lc);
+       ListCell   *lc2;
+
+       foreach(lc2, olddpns->rtable_names)
+       {
+           char       *oldname = (char *) lfirst(lc2);
+
+           if (oldname == NULL)
+               continue;
+           hentry = (NameHashEntry *) hash_search(names_hash,
+                                                  oldname,
+                                                  HASH_ENTER,
+                                                  &found);
+           /* we do not complain about duplicate names in parent namespaces */
+           hentry->counter = 0;
+       }
+   }
+
+   /* Now we can scan the rtable */
+   rtindex = 1;
    foreach(lc, dpns->rtable)
    {
        RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc);
        char       *refname;
 
+       /* Just in case this takes an unreasonable amount of time ... */
+       CHECK_FOR_INTERRUPTS();
+
        if (rels_used && !bms_is_member(rtindex, rels_used))
        {
            /* Ignore unreferenced RTE */
@@ -2712,56 +2767,62 @@ set_rtable_names(deparse_namespace *dpns, List *parent_namespaces,
        }
 
        /*
-        * If the selected name isn't unique, append digits to make it so
+        * If the selected name isn't unique, append digits to make it so, and
+        * make a new hash entry for it once we've got a unique name.  For a
+        * very long input name, we might have to truncate to stay within
+        * NAMEDATALEN.
         */
-       if (refname &&
-           !refname_is_unique(refname, dpns, parent_namespaces))
+       if (refname)
        {
-           char       *modname = (char *) palloc(strlen(refname) + 32);
-           int         i = 0;
+           hentry = (NameHashEntry *) hash_search(names_hash,
+                                                  refname,
+                                                  HASH_ENTER,
+                                                  &found);
+           if (found)
+           {
+               /* Name already in use, must choose a new one */
+               int         refnamelen = strlen(refname);
+               char       *modname = (char *) palloc(refnamelen + 16);
+               NameHashEntry *hentry2;
 
-           do
+               do
+               {
+                   hentry->counter++;
+                   for (;;)
+                   {
+                       /*
+                        * We avoid using %.*s here because it can misbehave
+                        * if the data is not valid in what libc thinks is the
+                        * prevailing encoding.
+                        */
+                       memcpy(modname, refname, refnamelen);
+                       sprintf(modname + refnamelen, "_%d", hentry->counter);
+                       if (strlen(modname) < NAMEDATALEN)
+                           break;
+                       /* drop chars from refname to keep all the digits */
+                       refnamelen = pg_mbcliplen(refname, refnamelen,
+                                                 refnamelen - 1);
+                   }
+                   hentry2 = (NameHashEntry *) hash_search(names_hash,
+                                                           modname,
+                                                           HASH_ENTER,
+                                                           &found);
+               } while (found);
+               hentry2->counter = 0;   /* init new hash entry */
+               refname = modname;
+           }
+           else
            {
-               sprintf(modname, "%s_%d", refname, ++i);
-           } while (!refname_is_unique(modname, dpns, parent_namespaces));
-           refname = modname;
+               /* Name not previously used, need only initialize hentry */
+               hentry->counter = 0;
+           }
        }
 
        dpns->rtable_names = lappend(dpns->rtable_names, refname);
        rtindex++;
    }
-}
-
-/*
- * refname_is_unique: is refname distinct from all already-chosen RTE names?
- */
-static bool
-refname_is_unique(char *refname, deparse_namespace *dpns,
-                 List *parent_namespaces)
-{
-   ListCell   *lc;
 
-   foreach(lc, dpns->rtable_names)
-   {
-       char       *oldname = (char *) lfirst(lc);
-
-       if (oldname && strcmp(oldname, refname) == 0)
-           return false;
-   }
-   foreach(lc, parent_namespaces)
-   {
-       deparse_namespace *olddpns = (deparse_namespace *) lfirst(lc);
-       ListCell   *lc2;
-
-       foreach(lc2, olddpns->rtable_names)
-       {
-           char       *oldname = (char *) lfirst(lc2);
-
-           if (oldname && strcmp(oldname, refname) == 0)
-               return false;
-       }
-   }
-   return true;
+   hash_destroy(names_hash);
 }
 
 /*
@@ -3589,16 +3650,34 @@ make_colname_unique(char *colname, deparse_namespace *dpns,
                    deparse_columns *colinfo)
 {
    /*
-    * If the selected name isn't unique, append digits to make it so
+    * If the selected name isn't unique, append digits to make it so.  For a
+    * very long input name, we might have to truncate to stay within
+    * NAMEDATALEN.
     */
    if (!colname_is_unique(colname, dpns, colinfo))
    {
-       char       *modname = (char *) palloc(strlen(colname) + 32);
+       int         colnamelen = strlen(colname);
+       char       *modname = (char *) palloc(colnamelen + 16);
        int         i = 0;
 
        do
        {
-           sprintf(modname, "%s_%d", colname, ++i);
+           i++;
+           for (;;)
+           {
+               /*
+                * We avoid using %.*s here because it can misbehave if the
+                * data is not valid in what libc thinks is the prevailing
+                * encoding.
+                */
+               memcpy(modname, colname, colnamelen);
+               sprintf(modname + colnamelen, "_%d", i);
+               if (strlen(modname) < NAMEDATALEN)
+                   break;
+               /* drop chars from colname to keep all the digits */
+               colnamelen = pg_mbcliplen(colname, colnamelen,
+                                         colnamelen - 1);
+           }
        } while (!colname_is_unique(modname, dpns, colinfo));
        colname = modname;
    }
index 3e4d424ecadaf6e086b7caad086432f76fd2e220..ae50c9460796392580a61b82512afa7ba9a9df08 100644 (file)
@@ -1475,6 +1475,33 @@ select * from int8_tbl i where i.* in (values(i.*::int8_tbl));
  4567890123456789 | -4567890123456789
 (5 rows)
 
+-- check unique-ification of overlength names
+create view tt18v as
+  select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy
+  union all
+  select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz;
+NOTICE:  identifier "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy" will be truncated to "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+NOTICE:  identifier "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz" will be truncated to "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+select pg_get_viewdef('tt18v', true);
+                                  pg_get_viewdef                                   
+-----------------------------------------------------------------------------------
+  SELECT xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q1,      +
+     xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q2           +
+    FROM int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +
+ UNION ALL                                                                        +
+  SELECT xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q1,      +
+     xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.q2           +
+    FROM int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx;
+(1 row)
+
+explain (costs off) select * from tt18v;
+                                         QUERY PLAN                                         
+--------------------------------------------------------------------------------------------
+ Append
+   ->  Seq Scan on int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+   ->  Seq Scan on int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx_1
+(3 rows)
+
 -- clean up all the random objects we made above
 set client_min_messages = warning;
 DROP SCHEMA temp_view_test CASCADE;
index bcee90d6146921b4d9f0d3d8035f21202e09f067..58d361df64988562c1ed028cf16cd84d46d411ee 100644 (file)
@@ -487,6 +487,15 @@ select * from tt17v;
 select pg_get_viewdef('tt17v', true);
 select * from int8_tbl i where i.* in (values(i.*::int8_tbl));
 
+-- check unique-ification of overlength names
+
+create view tt18v as
+  select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxy
+  union all
+  select * from int8_tbl xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxz;
+select pg_get_viewdef('tt18v', true);
+explain (costs off) select * from tt18v;
+
 -- clean up all the random objects we made above
 set client_min_messages = warning;
 DROP SCHEMA temp_view_test CASCADE;