scripts/data_maintainer.py: cleaned up
authormartinko <gamato@users.sf.net>
Mon, 7 Oct 2013 11:32:42 +0000 (13:32 +0200)
committermartinko <gamato@users.sf.net>
Mon, 7 Oct 2013 11:32:42 +0000 (13:32 +0200)
scripts/data_maintainer.py

index fee7d42d15655916df91b815319d0f5dbd03f4c1..c1ecd42a096167ee4afd383924d9dbbacb6c158e 100755 (executable)
@@ -31,23 +31,23 @@ Config template::
         and expire_date < now();
 
     # This will be run before executing the sql_get_pk_list query (optional)
-    sql_before_run =
-        select * from somefunction1(%(job_name)s);
+    #sql_before_run =
+    #    select * from somefunction1(%(job_name)s);
 
     # This will be run when the DM finishes (optional)
-    sql_after_run =
-        select * from somefunction2(%(job_name)s);
+    #sql_after_run =
+    #    select * from somefunction2(%(job_name)s);
 
     # Determines whether the sql_after_run query will be run in case the pk list query returns no rows
-    after_zero_rows = 1
+    #after_zero_rows = 1
 
     # This will be run if the DM crashes (optional)
-    sql_on_crash =
-        select * from somefunction3(%(job_name)s);
+    #sql_on_crash =
+    #    select * from somefunction3(%(job_name)s);
 
     # This may be used to control throttling of the DM (optional)
-    sql_throttle =
-        select lag>'5 minutes'::interval from pgq.get_consumer_info('failoverconsumer');
+    #sql_throttle =
+    #    select lag>'5 minutes'::interval from pgq.get_consumer_info('failoverconsumer');
 
     # materialize query so that transaction should not be open while processing it
     #with_hold       = 1
@@ -126,7 +126,7 @@ class DataMaintainer (skytools.DBScript):
     def work(self):
         self.log.info('Starting..')
         self.started = self.lap_time = time.time()
-        total_count = 0
+        self.total_count = 0
         bres = {}
 
         if self.sql_before:
@@ -167,7 +167,7 @@ class DataMaintainer (skytools.DBScript):
             self.log.debug(rcur.statusmessage)
             res = rcur.fetchall()
             count, lastitem = self.process_batch(res, mcur, bres)
-            total_count += count
+            self.total_count += count
             if not self.autocommit:
                 dbw.commit()
             self.stat_put("duration", time.time() - self.fetch_started)
@@ -176,27 +176,9 @@ class DataMaintainer (skytools.DBScript):
                 break
             if self.commit_delay > 0.0:
                 time.sleep(self.commit_delay)
-
             if self.sql_throttle:
-                while self.looping:
-                    tcur.execute(self.sql_throttle)
-                    _r = tcur.fetchall()
-                    assert len(_r)==1 and len(_r[0])==1, "Result of 'throttle' query must be 1 value"
-                    throttle = _r[0][0]
-                    if isinstance(throttle, bool):
-                        tt = float(throttle and 30)
-                    elif isinstance(throttle, (int, float)):
-                        tt = float(throttle)
-                    else:
-                        self.log.warn("Result of 'throttle' query must be boolean or numeric")
-                        break
-                    if tt > 0.0:
-                        self.log.debug("sleeping %f s", tt)
-                        time.sleep(tt)
-                    else:
-                        break
-                    self._print_count(total_count, "--- Waiting count: %s duration: %s ---")
-            self._print_count(total_count, "--- Running count: %s duration: %s ---")
+                self.throttle(tcur)
+            self._print_count("--- Running count: %s duration: %s ---")
 
         if not self.looping:
             self.log.info("Exiting on user request")
@@ -205,9 +187,9 @@ class DataMaintainer (skytools.DBScript):
         if not self.withhold:
             dbr.rollback()
         self.log.info("--- Total count: %s duration: %s ---",
-                total_count, datetime.timedelta(0, round(time.time() - self.started)))
+                self.total_count, datetime.timedelta(0, round(time.time() - self.started)))
 
-        if self.sql_after and (self.after_zero_rows > 0 or total_count > 0):
+        if self.sql_after and (self.after_zero_rows > 0 or self.total_count > 0):
             adb = self.get_database("dbafter", autocommit=1)
             acur = adb.cursor()
             acur.execute(self.sql_after, lastitem)
@@ -248,9 +230,29 @@ class DataMaintainer (skytools.DBScript):
                 ccur.execute(self.sql_crash, item)
             raise
 
-    def _print_count(self, count, text):
+    def throttle(self, tcur):
+        while self.looping:
+            tcur.execute(self.sql_throttle)
+            _r = tcur.fetchall()
+            assert len(_r) == 1 and len(_r[0]) == 1, "Result of 'throttle' query must be 1 value"
+            throttle = _r[0][0]
+            if isinstance(throttle, bool):
+                tt = float(throttle and 30)
+            elif isinstance(throttle, (int, float)):
+                tt = float(throttle)
+            else:
+                self.log.warn("Result of 'throttle' query must be boolean or numeric")
+                break
+            if tt > 0.0:
+                self.log.debug("sleeping %f s", tt)
+                time.sleep(tt)
+            else:
+                break
+            self._print_count("--- Waiting count: %s duration: %s ---")
+
+    def _print_count(self, text):
         if time.time() - self.lap_time > 60.0: # if one minute has passed print running totals
-            self.log.info(text, count, datetime.timedelta(0, round(time.time() - self.started)))
+            self.log.info(text, self.total_count, datetime.timedelta(0, round(time.time() - self.started)))
             self.lap_time = time.time()
 
 if __name__ == '__main__':