and expire_date < now();
# This will be run before executing the sql_get_pk_list query (optional)
- sql_before_run =
- select * from somefunction1(%(job_name)s);
+ #sql_before_run =
+ # select * from somefunction1(%(job_name)s);
# This will be run when the DM finishes (optional)
- sql_after_run =
- select * from somefunction2(%(job_name)s);
+ #sql_after_run =
+ # select * from somefunction2(%(job_name)s);
# Determines whether the sql_after_run query will be run in case the pk list query returns no rows
- after_zero_rows = 1
+ #after_zero_rows = 1
# This will be run if the DM crashes (optional)
- sql_on_crash =
- select * from somefunction3(%(job_name)s);
+ #sql_on_crash =
+ # select * from somefunction3(%(job_name)s);
# This may be used to control throttling of the DM (optional)
- sql_throttle =
- select lag>'5 minutes'::interval from pgq.get_consumer_info('failoverconsumer');
+ #sql_throttle =
+ # select lag>'5 minutes'::interval from pgq.get_consumer_info('failoverconsumer');
# materialize query so that transaction should not be open while processing it
#with_hold = 1
def work(self):
self.log.info('Starting..')
self.started = self.lap_time = time.time()
- total_count = 0
+ self.total_count = 0
bres = {}
if self.sql_before:
self.log.debug(rcur.statusmessage)
res = rcur.fetchall()
count, lastitem = self.process_batch(res, mcur, bres)
- total_count += count
+ self.total_count += count
if not self.autocommit:
dbw.commit()
self.stat_put("duration", time.time() - self.fetch_started)
break
if self.commit_delay > 0.0:
time.sleep(self.commit_delay)
-
if self.sql_throttle:
- while self.looping:
- tcur.execute(self.sql_throttle)
- _r = tcur.fetchall()
- assert len(_r)==1 and len(_r[0])==1, "Result of 'throttle' query must be 1 value"
- throttle = _r[0][0]
- if isinstance(throttle, bool):
- tt = float(throttle and 30)
- elif isinstance(throttle, (int, float)):
- tt = float(throttle)
- else:
- self.log.warn("Result of 'throttle' query must be boolean or numeric")
- break
- if tt > 0.0:
- self.log.debug("sleeping %f s", tt)
- time.sleep(tt)
- else:
- break
- self._print_count(total_count, "--- Waiting count: %s duration: %s ---")
- self._print_count(total_count, "--- Running count: %s duration: %s ---")
+ self.throttle(tcur)
+ self._print_count("--- Running count: %s duration: %s ---")
if not self.looping:
self.log.info("Exiting on user request")
if not self.withhold:
dbr.rollback()
self.log.info("--- Total count: %s duration: %s ---",
- total_count, datetime.timedelta(0, round(time.time() - self.started)))
+ self.total_count, datetime.timedelta(0, round(time.time() - self.started)))
- if self.sql_after and (self.after_zero_rows > 0 or total_count > 0):
+ if self.sql_after and (self.after_zero_rows > 0 or self.total_count > 0):
adb = self.get_database("dbafter", autocommit=1)
acur = adb.cursor()
acur.execute(self.sql_after, lastitem)
ccur.execute(self.sql_crash, item)
raise
- def _print_count(self, count, text):
+ def throttle(self, tcur):
+ while self.looping:
+ tcur.execute(self.sql_throttle)
+ _r = tcur.fetchall()
+ assert len(_r) == 1 and len(_r[0]) == 1, "Result of 'throttle' query must be 1 value"
+ throttle = _r[0][0]
+ if isinstance(throttle, bool):
+ tt = float(throttle and 30)
+ elif isinstance(throttle, (int, float)):
+ tt = float(throttle)
+ else:
+ self.log.warn("Result of 'throttle' query must be boolean or numeric")
+ break
+ if tt > 0.0:
+ self.log.debug("sleeping %f s", tt)
+ time.sleep(tt)
+ else:
+ break
+ self._print_count("--- Waiting count: %s duration: %s ---")
+
+ def _print_count(self, text):
if time.time() - self.lap_time > 60.0: # if one minute has passed print running totals
- self.log.info(text, count, datetime.timedelta(0, round(time.time() - self.started)))
+ self.log.info(text, self.total_count, datetime.timedelta(0, round(time.time() - self.started)))
self.lap_time = time.time()
if __name__ == '__main__':