Page MenuHomeGRNET

No OneTemporary

File Metadata

Created
Mon, Nov 25, 7:43 PM
diff --git a/dnsworker/lib/dnsworker/base_worker.rb b/dnsworker/lib/dnsworker/base_worker.rb
index 2a3e5d8..05cab11 100644
--- a/dnsworker/lib/dnsworker/base_worker.rb
+++ b/dnsworker/lib/dnsworker/base_worker.rb
@@ -1,137 +1,145 @@
require 'json'
require 'mysql2'
require 'open3'
module DNSWorker::BaseWorker
class JobFailed < StandardError; end
class Retry < StandardError; end
class CmdFailed < StandardError; end
attr_reader :client
attr_reader :cfg
attr_reader :dry_run
attr_accessor :working
# Start consuming jobs.
#
# Handles reconnects.
def work(opts = {})
@dry_run = opts[:dry_run]
register_signals
opts[:once] ? run : watch
end
# Graceful stop the worker.
#
# If no job is running stops immediately.
def stop
if working
@stop = true
else
exit
end
end
private
def stop?
@stop
end
def system(cmd)
p [:cmd, cmd]
Kernel.system(cmd)
end
def register_signals
trap('INT') { stop }
trap('TERM') { stop }
end
def watch
loop do
procline('watching')
break if stop?
run
sleep(cfg['timeout'])
end
end
def run
self.working = true
- jobs = client.query('select * from jobs where status in (0, 2) order by id asc').to_a
+
+ begin
+ jobs = client.query('select * from jobs where status in (0, 2) order by id asc').to_a
+ rescue Mysql2::Error
+ self.working = false
+ $stderr.puts "Can't query MySQL, exiting working state"
+ return
+ end
+
jobs.group_by { |j| j[:domain_id] }.each { |domain_id, jobs|
process_jobs(domain_id, jobs)
}
self.working = false
end
def process_jobs(domain_id, jobs)
catch(:stop_process) do
jobs.each { |job|
if job[:status] == 2 # Stop on processing zone on failed jobs
id, jtype, jargs_raw = job.values_at(:id, :job_type, :args)
job_info = "jobid=#{id} '#{jtype}:#{jargs_raw}'"
$stderr.puts "Not processing domain=#{domain_id} because a failed job exists #{job_info}"
throw :stop_process
end
dispatch(job)
}
end
end
def procline(line)
$0 = "dnsworker-#{line}"
end
def initialize(mysql_cfg)
my_cfg = mysql_cfg.merge(reconnect: true, symbolize_keys: true)
@client = Mysql2::Client.new(my_cfg)
end
def dispatch(job)
id, jtype, jargs_raw = job.values_at(:id, :job_type, :args)
jargs = JSON.parse(jargs_raw, symbolize_names: true)
job_info = "jobid=#{id} '#{jtype}:#{jargs_raw}'"
procline("working on #{job_info}")
$stderr.puts "working on #{job_info}"
send(jtype, jargs) unless dry_run
mark_done(id)
rescue Retry
mark_retry(id)
throw :stop_process
rescue CmdFailed
mark_fail(id)
throw :stop_process
rescue JobFailed
mark_fail(id)
throw :stop_process
end
def mark_done(id)
$stderr.puts :done
client.query("update jobs set status=1 where id=#{id}") unless dry_run
end
def mark_fail(id)
$stderr.puts :fail
client.query("update jobs set status=2 where id=#{id}") unless dry_run
end
def mark_retry(id)
$stderr.puts :retry
client.query("update jobs set retries = retries + 1 where id=#{id}") unless dry_run
end
def cmd(command)
out, err, status = Open3.capture3(command)
raise CmdFailed if !status.success?
[out, err]
end
end

Event Timeline