Partition is never created if it is already known to be obsolete (and so to be dropped).
else if part function present in db, call it
else clone master table"""
curs = self.dst_curs
+ if (self.conf.ignore_old_events and self.conf.retention_period and
+ self.is_obsolete_partition (dst, self.conf.retention_period, self.conf.period)):
+ self.ignored_tables.add(dst)
+ return
if skytools.exists_table(curs, dst):
return
+
dst = quote_fqident(dst)
vals = {'dest': dst,
'part': dst,
self.log.info("Dropped tables: %s", ", ".join(res))
return res
+ def is_obsolete_partition (self, partition_table, retention_period, partition_period):
+ """ Test partition name of partition-by-date parent table.
+ """
+ curs = self.dst_curs
+ func = "londiste.is_obsolete_partition"
+ args = [partition_table, retention_period, partition_period]
+ sql = "select " + func + " (%s, %s, %s)"
+ self.log.debug("func: %s, args: %s", func, args)
+ curs.execute(sql, args)
+ res = curs.fetchone()[0]
+ if res:
+ self.log.info("Ignored table: %s", partition_table)
+ return res
+
def get_copy_condition(self, src_curs, dst_curs):
""" Prepare where condition for copy and replay filtering.
"""
-- Names of partitions dropped
-------------------------------------------------------------------------------
declare
- _schema text not null := lower (split_part (i_parent_table, '.', 1));
- _table text not null := lower (split_part (i_parent_table, '.', 2));
- _part text;
- _expr text;
- _dfmt text;
+ _part text;
begin
- if i_partition_period in ('year', 'yearly') then
- _expr := '_[0-9]{4}';
- _dfmt := '_YYYY';
- elsif i_partition_period in ('month', 'monthly') then
- _expr := '_[0-9]{4}_[0-9]{2}';
- _dfmt := '_YYYY_MM';
- elsif i_partition_period in ('day', 'daily') then
- _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}';
- _dfmt := '_YYYY_MM_DD';
- elsif i_partition_period in ('hour', 'hourly') then
- _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}';
- _dfmt := '_YYYY_MM_DD_HH24';
- else
- raise exception 'not supported i_partition_period: %', i_partition_period;
- end if;
-
- if length (_table) = 0 then
- _table := _schema;
- _schema := 'public';
- end if;
-
for _part in
- select quote_ident (t.schemaname) ||'.'|| quote_ident (t.tablename)
- from pg_catalog.pg_tables t
- where t.schemaname = _schema
- and t.tablename ~ ('^'|| _table || _expr ||'$')
- and t.tablename < _table || to_char (now() - i_retention_period, _dfmt)
- order by 1
+ select londiste.list_obsolete_partitions (i_parent_table, i_retention_period, i_partition_period)
loop
execute 'drop table '|| _part;
return next _part;
--- /dev/null
+
+create or replace function londiste.is_obsolete_partition
+(
+ in i_partition_table text,
+ in i_retention_period interval,
+ in i_partition_period text
+)
+ returns boolean
+as $$
+-------------------------------------------------------------------------------
+-- Function: londiste.is_obsolete_partition(3)
+--
+-- Test partition name of partition-by-date parent table.
+--
+-- Parameters:
+-- i_partition_table Partition table name we want to check
+-- i_retention_period How long to keep partitions around
+-- i_partition_period One of: year, month, day, hour
+--
+-- Returns:
+-- True if partition is too old, false if it is not,
+-- null if its name does not match expected pattern.
+-------------------------------------------------------------------------------
+declare
+ _expr text;
+ _dfmt text;
+ _base text;
+begin
+ if i_partition_period in ('year', 'yearly') then
+ _expr := '_[0-9]{4}';
+ _dfmt := '_YYYY';
+ elsif i_partition_period in ('month', 'monthly') then
+ _expr := '_[0-9]{4}_[0-9]{2}';
+ _dfmt := '_YYYY_MM';
+ elsif i_partition_period in ('day', 'daily') then
+ _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}';
+ _dfmt := '_YYYY_MM_DD';
+ elsif i_partition_period in ('hour', 'hourly') then
+ _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}';
+ _dfmt := '_YYYY_MM_DD_HH24';
+ else
+ raise exception 'not supported i_partition_period: %', i_partition_period;
+ end if;
+
+ _expr = '^(.+)' || _expr || '$';
+ _base = substring (i_partition_table from _expr);
+
+ if _base is null then
+ return null;
+ elsif i_partition_table < _base || to_char (now() - i_retention_period, _dfmt) then
+ return true;
+ else
+ return false;
+ end if;
+end;
+$$ language plpgsql;
--- /dev/null
+
+create or replace function londiste.list_obsolete_partitions
+(
+ in i_parent_table text,
+ in i_retention_period interval,
+ in i_partition_period text
+)
+ returns setof text
+as $$
+-------------------------------------------------------------------------------
+-- Function: londiste.list_obsolete_partitions(3)
+--
+-- List obsolete partitions of partition-by-date parent table.
+--
+-- Parameters:
+-- i_parent_table Master table from which partitions are inherited
+-- i_retention_period How long to keep partitions around
+-- i_partition_period One of: year, month, day, hour
+--
+-- Returns:
+-- Names of partitions to be dropped
+-------------------------------------------------------------------------------
+declare
+ _schema text not null := lower (split_part (i_parent_table, '.', 1));
+ _table text not null := lower (split_part (i_parent_table, '.', 2));
+ _part text;
+ _expr text;
+ _dfmt text;
+begin
+ if i_partition_period in ('year', 'yearly') then
+ _expr := '_[0-9]{4}';
+ _dfmt := '_YYYY';
+ elsif i_partition_period in ('month', 'monthly') then
+ _expr := '_[0-9]{4}_[0-9]{2}';
+ _dfmt := '_YYYY_MM';
+ elsif i_partition_period in ('day', 'daily') then
+ _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}';
+ _dfmt := '_YYYY_MM_DD';
+ elsif i_partition_period in ('hour', 'hourly') then
+ _expr := '_[0-9]{4}_[0-9]{2}_[0-9]{2}_[0-9]{2}';
+ _dfmt := '_YYYY_MM_DD_HH24';
+ else
+ raise exception 'not supported i_partition_period: %', i_partition_period;
+ end if;
+
+ if length (_table) = 0 then
+ _table := _schema;
+ _schema := 'public';
+ end if;
+
+ for _part in
+ select quote_ident (t.schemaname) ||'.'|| quote_ident (t.tablename)
+ from pg_catalog.pg_tables t
+ where t.schemaname = _schema
+ and t.tablename ~ ('^'|| _table || _expr ||'$')
+ and t.tablename < _table || to_char (now() - i_retention_period, _dfmt)
+ order by 1
+ loop
+ return next _part;
+ end loop;
+end;
+$$ language plpgsql;
on.functions = %(londiste_local_fns)s, %(londiste_internal_fns)s
londiste_writer = execute
+
[5.seqs]
londiste_writer = usage
on.sequences =
londiste.table_info_nr_seq,
londiste.seq_info_nr_seq
+
[6.maint]
pgq_admin = execute
on.functions = londiste.periodic_maintenance()
londiste.drop_table_triggers(text, text),
londiste.table_info_trigger(),
londiste.create_partition(text, text, text, text, timestamptz, text),
+ londiste.is_obsolete_partition (text, interval, text),
+ londiste.list_obsolete_partitions (text, interval, text),
londiste.drop_obsolete_partitions (text, interval, text),
londiste.create_trigger(text,text,text[],text,text)
-