diff --git a/dnsworker/lib/dnsworker/base_worker.rb b/dnsworker/lib/dnsworker/base_worker.rb index 2a3e5d8..05cab11 100644 --- a/dnsworker/lib/dnsworker/base_worker.rb +++ b/dnsworker/lib/dnsworker/base_worker.rb @@ -1,137 +1,145 @@ require 'json' require 'mysql2' require 'open3' module DNSWorker::BaseWorker class JobFailed < StandardError; end class Retry < StandardError; end class CmdFailed < StandardError; end attr_reader :client attr_reader :cfg attr_reader :dry_run attr_accessor :working # Start consuming jobs. # # Handles reconnects. def work(opts = {}) @dry_run = opts[:dry_run] register_signals opts[:once] ? run : watch end # Graceful stop the worker. # # If no job is running stops immediately. def stop if working @stop = true else exit end end private def stop? @stop end def system(cmd) p [:cmd, cmd] Kernel.system(cmd) end def register_signals trap('INT') { stop } trap('TERM') { stop } end def watch loop do procline('watching') break if stop? run sleep(cfg['timeout']) end end def run self.working = true - jobs = client.query('select * from jobs where status in (0, 2) order by id asc').to_a + + begin + jobs = client.query('select * from jobs where status in (0, 2) order by id asc').to_a + rescue Mysql2::Error + self.working = false + $stderr.puts "Can't query MySQL, exiting working state" + return + end + jobs.group_by { |j| j[:domain_id] }.each { |domain_id, jobs| process_jobs(domain_id, jobs) } self.working = false end def process_jobs(domain_id, jobs) catch(:stop_process) do jobs.each { |job| if job[:status] == 2 # Stop on processing zone on failed jobs id, jtype, jargs_raw = job.values_at(:id, :job_type, :args) job_info = "jobid=#{id} '#{jtype}:#{jargs_raw}'" $stderr.puts "Not processing domain=#{domain_id} because a failed job exists #{job_info}" throw :stop_process end dispatch(job) } end end def procline(line) $0 = "dnsworker-#{line}" end def initialize(mysql_cfg) my_cfg = mysql_cfg.merge(reconnect: true, symbolize_keys: true) @client = Mysql2::Client.new(my_cfg) end def dispatch(job) id, jtype, jargs_raw = job.values_at(:id, :job_type, :args) jargs = JSON.parse(jargs_raw, symbolize_names: true) job_info = "jobid=#{id} '#{jtype}:#{jargs_raw}'" procline("working on #{job_info}") $stderr.puts "working on #{job_info}" send(jtype, jargs) unless dry_run mark_done(id) rescue Retry mark_retry(id) throw :stop_process rescue CmdFailed mark_fail(id) throw :stop_process rescue JobFailed mark_fail(id) throw :stop_process end def mark_done(id) $stderr.puts :done client.query("update jobs set status=1 where id=#{id}") unless dry_run end def mark_fail(id) $stderr.puts :fail client.query("update jobs set status=2 where id=#{id}") unless dry_run end def mark_retry(id) $stderr.puts :retry client.query("update jobs set retries = retries + 1 where id=#{id}") unless dry_run end def cmd(command) out, err, status = Open3.capture3(command) raise CmdFailed if !status.success? [out, err] end end