dbbefore = dbname=destdb_test
dbafter = dbname=destdb_test
dbcrash = dbname=destdb_test
+ dbthrottle = dbname=queuedb_test
sql_get_pk_list =
select username
sql_on_crash =
select * from somefunction3(%(job_name)s);
+ # This may be used to control throttling of the DM (optional)
+ sql_throttle =
+ select lag>'5 minutes'::interval from pgq.get_consumer_info('failoverconsumer');
+
# materialize query so that transaction should not be open while processing it
#with_hold = 1
# query to be run if the process crashes
self.sql_crash = self.cf.get("sql_on_crash", "")
+ # query for checking if / how much to throttle
+ self.sql_throttle = self.cf.get("sql_throttle", "")
+
# how many records to fetch at once
self.fetchcnt = self.cf.getint("fetchcnt", 100)
self.fetchcnt = self.cf.getint("fetch_count", self.fetchcnt)
def work(self):
self.log.info('Starting..')
- started = lap_time = time.time()
+ self.started = self.lap_time = time.time()
total_count = 0
bres = {}
assert len(res)==1, "Result of a 'before' query must be 1 row"
bres = res[0].copy()
+ if self.sql_throttle:
+ dbt = self.get_database("dbthrottle", autocommit=1)
+ tcur = dbt.cursor()
+
if self.autocommit:
self.log.info("Autocommit after each modify")
dbw = self.get_database("dbwrite", autocommit=1)
dbw.commit()
self.stat_put("duration", time.time() - self.fetch_started)
self.send_stats()
- if len(res) < self.fetchcnt:
- break
- if not self.looping:
- self.log.info("Exiting on user request")
+ if len(res) < self.fetchcnt or not self.looping:
break
if self.commit_delay > 0.0:
time.sleep(self.commit_delay)
- if time.time() - lap_time > 60.0: # if one minute has passed print running totals
- self.log.info("--- Running count: %s duration: %s ---",
- total_count, datetime.timedelta(0, round(time.time() - started)))
- lap_time = time.time()
+
+ if self.sql_throttle:
+ while self.looping:
+ tcur.execute(self.sql_throttle)
+ _r = tcur.fetchall()
+ assert len(_r)==1 and len(_r[0])==1, "Result of 'throttle' query must be 1 value"
+ throttle = _r[0][0]
+ if isinstance(throttle, bool):
+ tt = float(throttle and 30)
+ elif isinstance(throttle, (int, float)):
+ tt = float(throttle)
+ else:
+ self.log.warn("Result of 'throttle' query must be boolean or numeric")
+ break
+ if tt > 0.0:
+ self.log.debug("sleeping %f s", tt)
+ time.sleep(tt)
+ else:
+ break
+ self._print_count(total_count, "--- Waiting count: %s duration: %s ---")
+ self._print_count(total_count, "--- Running count: %s duration: %s ---")
+
+ if not self.looping:
+ self.log.info("Exiting on user request")
rcur.execute("CLOSE data_maint_cur")
if not self.withhold:
dbr.rollback()
self.log.info("--- Total count: %s duration: %s ---",
- total_count, datetime.timedelta(0, round(time.time() - started)))
+ total_count, datetime.timedelta(0, round(time.time() - self.started)))
if self.sql_after and (self.after_zero_rows > 0 or total_count > 0):
adb = self.get_database("dbafter", autocommit=1)
ccur.execute(self.sql_crash, item)
raise
+ def _print_count(self, count, text):
+ if time.time() - self.lap_time > 60.0: # if one minute has passed print running totals
+ self.log.info(text, count, datetime.timedelta(0, round(time.time() - self.started)))
+ self.lap_time = time.time()
+
if __name__ == '__main__':
script = DataMaintainer(sys.argv[1:])
script.start()