'D': skytools.mk_delete_sql}
if self.data:
curs.execute("\n".join(mk_sql[op](row, self.table, self.pkeys)
- for op, row in self.data))
+ for op, row in self.data))
class BaseBulkCollectingLoader(BaseLoader):
sql = "select " + func + " (%s, %s, %s)"
self.log.debug("func: %s, args: %s", func, args)
curs.execute(sql, args)
- res = []
- for row in curs.fetchall():
- res.append(row[0])
+ res = [row[0] for row in curs.fetchall()]
if res:
self.log.info("Dropped tables: %s", ", ".join(res))
class RetriableEvent(Event):
- """Event which can be retryed
+ """Event which can be retried
Consumer is supposed to tag them after processing.
"""
"""Tag event for retry. (internal)"""
cx.execute("select pgq.event_retry(%s, %s, %s)",
[batch_id, ev_id, retry_time])
-
raise Exception('process_local_event not implemented')
def is_batch_done(self):
- """Helper function to keep track of last successful batch
- in external database.
+ """Helper function to keep track of last successful batch in external database.
"""
local_tick = self.load_local_tick()
prev_tick, cur_tick, local_tick))
def set_batch_done(self):
- """Helper function to set last successful batch
- in external database.
+ """Helper function to set last successful batch in external database.
"""
tick_id = self.batch_info['tick_id']
self.save_local_tick(tick_id)