londiste.handlers.dispatch: improved ignoring of events aiming at obsolete partitions
authormartinko <gamato@users.sf.net>
Fri, 3 Oct 2014 11:36:49 +0000 (13:36 +0200)
committermartinko <gamato@users.sf.net>
Fri, 3 Oct 2014 11:36:49 +0000 (13:36 +0200)
Partition is never created if it is already known to be obsolete (and so to be dropped).

python/londiste/handlers/dispatch.py
sql/londiste/functions/londiste.drop_obsolete_partitions.sql
sql/londiste/functions/londiste.is_obsolete_partition.sql [new file with mode: 0644]
sql/londiste/functions/londiste.list_obsolete_partitions.sql [new file with mode: 0644]
sql/londiste/structure/grants.ini

index 90d01bbb6bebc4e425b336867ed6c45cb18f8fd1..68e170838259776a6290cd9d65e930f7a5133e40 100644 (file)
@@ -849,8 +849,13 @@ class Dispatcher (ShardHandler):
         else if part function present in db, call it
         else clone master table"""
         curs = self.dst_curs
+        if (self.conf.ignore_old_events and self.conf.retention_period and
+                self.is_obsolete_partition (dst, self.conf.retention_period, self.conf.period)):
+            self.ignored_tables.add(dst)
+            return
         if skytools.exists_table(curs, dst):
             return
+
         dst = quote_fqident(dst)
         vals = {'dest': dst,
                 'part': dst,
@@ -926,6 +931,20 @@ class Dispatcher (ShardHandler):
             self.log.info("Dropped tables: %s", ", ".join(res))
         return res
 
+    def is_obsolete_partition (self, partition_table, retention_period, partition_period):
+        """ Test partition name of partition-by-date parent table.
+        """
+        curs = self.dst_curs
+        func = "londiste.is_obsolete_partition"
+        args = [partition_table, retention_period, partition_period]
+        sql = "select " + func + " (%s, %s, %s)"
+        self.log.debug("func: %s, args: %s", func, args)
+        curs.execute(sql, args)
+        res = curs.fetchone()[0]
+        if res:
+            self.log.info("Ignored table: %s", partition_table)
+        return res
+
     def get_copy_condition(self, src_curs, dst_curs):
         """ Prepare where condition for copy and replay filtering.
         """
index 4346724ec1a62739181744113808fcde5ccdf896..1718fa73c5e34a2750593ff8a33d6aef6b78d68a 100644 (file)
@@ -21,40 +21,10 @@ as $$
 --    Names of partitions dropped
 -------------------------------------------------------------------------------
 declare
-    _schema text not null := lower (split_part (i_parent_table, '.', 1));
-    _table  text not null := lower (split_part (i_parent_table, '.', 2));
-    _part   text;
-    _expr   text;
-    _dfmt   text;
+    _part text;
 begin
-    if i_partition_period in ('year', 'yearly') then
-        _expr := '_[0-9]{4}';
-        _dfmt := '_YYYY';
-    elsif i_partition_period in ('month', 'monthly') then
-        _expr := '_[0-9]{4}_[0-9]{2}';
-        _dfmt := '_YYYY_MM';
-    elsif i_partition_period in ('day', 'daily') then
-        _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}';
-        _dfmt := '_YYYY_MM_DD';
-    elsif i_partition_period in ('hour', 'hourly') then
-        _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}';
-        _dfmt := '_YYYY_MM_DD_HH24';
-    else
-        raise exception 'not supported i_partition_period: %', i_partition_period;
-    end if;
-
-    if length (_table) = 0 then
-        _table := _schema;
-        _schema := 'public';
-    end if;
-
     for _part in
-        select quote_ident (t.schemaname) ||'.'|| quote_ident (t.tablename)
-          from pg_catalog.pg_tables t
-         where t.schemaname = _schema
-           and t.tablename ~ ('^'|| _table || _expr ||'$')
-           and t.tablename < _table || to_char (now() - i_retention_period, _dfmt)
-         order by 1
+        select londiste.list_obsolete_partitions (i_parent_table, i_retention_period, i_partition_period)
     loop
         execute 'drop table '|| _part;
         return next _part;
diff --git a/sql/londiste/functions/londiste.is_obsolete_partition.sql b/sql/londiste/functions/londiste.is_obsolete_partition.sql
new file mode 100644 (file)
index 0000000..d7baea9
--- /dev/null
@@ -0,0 +1,56 @@
+
+create or replace function londiste.is_obsolete_partition
+(
+    in i_partition_table text,
+    in i_retention_period interval,
+    in i_partition_period text
+)
+    returns boolean
+as $$
+-------------------------------------------------------------------------------
+--  Function: londiste.is_obsolete_partition(3)
+--
+--    Test partition name of partition-by-date parent table.
+--
+--  Parameters:
+--    i_partition_table     Partition table name we want to check
+--    i_retention_period    How long to keep partitions around
+--    i_partition_period    One of: year, month, day, hour
+--
+--  Returns:
+--    True if partition is too old, false if it is not,
+--    null if its name does not match expected pattern.
+-------------------------------------------------------------------------------
+declare
+    _expr text;
+    _dfmt text;
+    _base text;
+begin
+    if i_partition_period in ('year', 'yearly') then
+        _expr := '_[0-9]{4}';
+        _dfmt := '_YYYY';
+    elsif i_partition_period in ('month', 'monthly') then
+        _expr := '_[0-9]{4}_[0-9]{2}';
+        _dfmt := '_YYYY_MM';
+    elsif i_partition_period in ('day', 'daily') then
+        _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}';
+        _dfmt := '_YYYY_MM_DD';
+    elsif i_partition_period in ('hour', 'hourly') then
+        _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}';
+        _dfmt := '_YYYY_MM_DD_HH24';
+    else
+        raise exception 'not supported i_partition_period: %', i_partition_period;
+    end if;
+
+    _expr = '^(.+)' || _expr || '$';
+    _base = substring (i_partition_table from _expr);
+
+    if _base is null then
+        return null;
+    elsif i_partition_table < _base || to_char (now() - i_retention_period, _dfmt) then
+        return true;
+    else
+        return false;
+    end if;
+end;
+$$ language plpgsql;
diff --git a/sql/londiste/functions/londiste.list_obsolete_partitions.sql b/sql/londiste/functions/londiste.list_obsolete_partitions.sql
new file mode 100644 (file)
index 0000000..517f6bc
--- /dev/null
@@ -0,0 +1,62 @@
+
+create or replace function londiste.list_obsolete_partitions
+(
+    in i_parent_table text,
+    in i_retention_period interval,
+    in i_partition_period text
+)
+    returns setof text
+as $$
+-------------------------------------------------------------------------------
+--  Function: londiste.list_obsolete_partitions(3)
+--
+--    List obsolete partitions of partition-by-date parent table.
+--
+--  Parameters:
+--    i_parent_table        Master table from which partitions are inherited
+--    i_retention_period    How long to keep partitions around
+--    i_partition_period    One of: year, month, day, hour
+--
+--  Returns:
+--    Names of partitions to be dropped
+-------------------------------------------------------------------------------
+declare
+    _schema text not null := lower (split_part (i_parent_table, '.', 1));
+    _table  text not null := lower (split_part (i_parent_table, '.', 2));
+    _part   text;
+    _expr   text;
+    _dfmt   text;
+begin
+    if i_partition_period in ('year', 'yearly') then
+        _expr := '_[0-9]{4}';
+        _dfmt := '_YYYY';
+    elsif i_partition_period in ('month', 'monthly') then
+        _expr := '_[0-9]{4}_[0-9]{2}';
+        _dfmt := '_YYYY_MM';
+    elsif i_partition_period in ('day', 'daily') then
+        _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}';
+        _dfmt := '_YYYY_MM_DD';
+    elsif i_partition_period in ('hour', 'hourly') then
+        _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}';
+        _dfmt := '_YYYY_MM_DD_HH24';
+    else
+        raise exception 'not supported i_partition_period: %', i_partition_period;
+    end if;
+
+    if length (_table) = 0 then
+        _table := _schema;
+        _schema := 'public';
+    end if;
+
+    for _part in
+        select quote_ident (t.schemaname) ||'.'|| quote_ident (t.tablename)
+          from pg_catalog.pg_tables t
+         where t.schemaname = _schema
+           and t.tablename ~ ('^'|| _table || _expr ||'$')
+           and t.tablename < _table || to_char (now() - i_retention_period, _dfmt)
+         order by 1
+    loop
+        return next _part;
+    end loop;
+end;
+$$ language plpgsql;
index ca2a37650faa776a5f7000a7b0a2824f8a9f6912..80d83bc395fd2e2dcab04fa6d357124c5e5162e4 100644 (file)
@@ -29,12 +29,14 @@ londiste_writer = execute
 on.functions = %(londiste_local_fns)s, %(londiste_internal_fns)s
 londiste_writer = execute
 
+
 [5.seqs]
 londiste_writer = usage
 on.sequences =
        londiste.table_info_nr_seq,
        londiste.seq_info_nr_seq
 
+
 [6.maint]
 pgq_admin = execute
 on.functions = londiste.periodic_maintenance()
@@ -96,7 +98,8 @@ londiste_local_fns =
        londiste.drop_table_triggers(text, text),
        londiste.table_info_trigger(),
        londiste.create_partition(text, text, text, text, timestamptz, text),
+       londiste.is_obsolete_partition (text, interval, text),
+       londiste.list_obsolete_partitions (text, interval, text),
        londiste.drop_obsolete_partitions (text, interval, text),
        londiste.create_trigger(text,text,text[],text,text)
 
-