scripts/data_maintainer.py: added dynamic throttling support (via sql)
authormartinko <gamato@users.sf.net>
Thu, 3 Oct 2013 09:19:53 +0000 (11:19 +0200)
committermartinko <gamato@users.sf.net>
Thu, 3 Oct 2013 09:19:53 +0000 (11:19 +0200)
scripts/data_maintainer.py [changed mode: 0644->0755]

old mode 100644 (file)
new mode 100755 (executable)
index 40a5f5a..fee7d42
@@ -15,6 +15,7 @@ Config template::
     dbbefore        = dbname=destdb_test
     dbafter         = dbname=destdb_test
     dbcrash         = dbname=destdb_test
+    dbthrottle      = dbname=queuedb_test
 
     sql_get_pk_list =
         select username
@@ -44,6 +45,10 @@ Config template::
     sql_on_crash =
         select * from somefunction3(%(job_name)s);
 
+    # This may be used to control throttling of the DM (optional)
+    sql_throttle =
+        select lag>'5 minutes'::interval from pgq.get_consumer_info('failoverconsumer');
+
     # materialize query so that transaction should not be open while processing it
     #with_hold       = 1
 
@@ -102,6 +107,9 @@ class DataMaintainer (skytools.DBScript):
         # query to be run if the process crashes
         self.sql_crash = self.cf.get("sql_on_crash", "")
 
+        # query for checking if / how much to throttle
+        self.sql_throttle = self.cf.get("sql_throttle", "")
+
         # how many records to fetch at once
         self.fetchcnt = self.cf.getint("fetchcnt", 100)
         self.fetchcnt = self.cf.getint("fetch_count", self.fetchcnt)
@@ -117,7 +125,7 @@ class DataMaintainer (skytools.DBScript):
 
     def work(self):
         self.log.info('Starting..')
-        started = lap_time = time.time()
+        self.started = self.lap_time = time.time()
         total_count = 0
         bres = {}
 
@@ -130,6 +138,10 @@ class DataMaintainer (skytools.DBScript):
                 assert len(res)==1, "Result of a 'before' query must be 1 row"
                 bres = res[0].copy()
 
+        if self.sql_throttle:
+            dbt = self.get_database("dbthrottle", autocommit=1)
+            tcur = dbt.cursor()
+
         if self.autocommit:
             self.log.info("Autocommit after each modify")
             dbw = self.get_database("dbwrite", autocommit=1)
@@ -160,23 +172,40 @@ class DataMaintainer (skytools.DBScript):
                 dbw.commit()
             self.stat_put("duration", time.time() - self.fetch_started)
             self.send_stats()
-            if len(res) < self.fetchcnt:
-                break
-            if not self.looping:
-                self.log.info("Exiting on user request")
+            if len(res) < self.fetchcnt or not self.looping:
                 break
             if self.commit_delay > 0.0:
                 time.sleep(self.commit_delay)
-            if time.time() - lap_time > 60.0: # if one minute has passed print running totals
-                self.log.info("--- Running count: %s duration: %s ---",
-                        total_count, datetime.timedelta(0, round(time.time() - started)))
-                lap_time = time.time()
+
+            if self.sql_throttle:
+                while self.looping:
+                    tcur.execute(self.sql_throttle)
+                    _r = tcur.fetchall()
+                    assert len(_r)==1 and len(_r[0])==1, "Result of 'throttle' query must be 1 value"
+                    throttle = _r[0][0]
+                    if isinstance(throttle, bool):
+                        tt = float(throttle and 30)
+                    elif isinstance(throttle, (int, float)):
+                        tt = float(throttle)
+                    else:
+                        self.log.warn("Result of 'throttle' query must be boolean or numeric")
+                        break
+                    if tt > 0.0:
+                        self.log.debug("sleeping %f s", tt)
+                        time.sleep(tt)
+                    else:
+                        break
+                    self._print_count(total_count, "--- Waiting count: %s duration: %s ---")
+            self._print_count(total_count, "--- Running count: %s duration: %s ---")
+
+        if not self.looping:
+            self.log.info("Exiting on user request")
 
         rcur.execute("CLOSE data_maint_cur")
         if not self.withhold:
             dbr.rollback()
         self.log.info("--- Total count: %s duration: %s ---",
-                total_count, datetime.timedelta(0, round(time.time() - started)))
+                total_count, datetime.timedelta(0, round(time.time() - self.started)))
 
         if self.sql_after and (self.after_zero_rows > 0 or total_count > 0):
             adb = self.get_database("dbafter", autocommit=1)
@@ -219,6 +248,11 @@ class DataMaintainer (skytools.DBScript):
                 ccur.execute(self.sql_crash, item)
             raise
 
+    def _print_count(self, count, text):
+        if time.time() - self.lap_time > 60.0: # if one minute has passed print running totals
+            self.log.info(text, count, datetime.timedelta(0, round(time.time() - self.started)))
+            self.lap_time = time.time()
+
 if __name__ == '__main__':
     script = DataMaintainer(sys.argv[1:])
     script.start()