Page Menu
Home
GRNET
Search
Configure Global Search
Log In
Files
F324386
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Mon, Nov 25, 7:43 PM
Size
3 KB
Mime Type
text/x-diff
Expires
Wed, Nov 27, 7:43 PM (1 d, 21 h)
Engine
blob
Format
Raw Data
Handle
156205
Attached To
rWEBDNS WebDNS (edet4)
View Options
diff --git a/dnsworker/lib/dnsworker/base_worker.rb b/dnsworker/lib/dnsworker/base_worker.rb
index 2a3e5d8..05cab11 100644
--- a/dnsworker/lib/dnsworker/base_worker.rb
+++ b/dnsworker/lib/dnsworker/base_worker.rb
@@ -1,137 +1,145 @@
require 'json'
require 'mysql2'
require 'open3'
module DNSWorker::BaseWorker
class JobFailed < StandardError; end
class Retry < StandardError; end
class CmdFailed < StandardError; end
attr_reader :client
attr_reader :cfg
attr_reader :dry_run
attr_accessor :working
# Start consuming jobs.
#
# Handles reconnects.
def work(opts = {})
@dry_run = opts[:dry_run]
register_signals
opts[:once] ? run : watch
end
# Graceful stop the worker.
#
# If no job is running stops immediately.
def stop
if working
@stop = true
else
exit
end
end
private
def stop?
@stop
end
def system(cmd)
p [:cmd, cmd]
Kernel.system(cmd)
end
def register_signals
trap('INT') { stop }
trap('TERM') { stop }
end
def watch
loop do
procline('watching')
break if stop?
run
sleep(cfg['timeout'])
end
end
def run
self.working = true
- jobs = client.query('select * from jobs where status in (0, 2) order by id asc').to_a
+
+ begin
+ jobs = client.query('select * from jobs where status in (0, 2) order by id asc').to_a
+ rescue Mysql2::Error
+ self.working = false
+ $stderr.puts "Can't query MySQL, exiting working state"
+ return
+ end
+
jobs.group_by { |j| j[:domain_id] }.each { |domain_id, jobs|
process_jobs(domain_id, jobs)
}
self.working = false
end
def process_jobs(domain_id, jobs)
catch(:stop_process) do
jobs.each { |job|
if job[:status] == 2 # Stop on processing zone on failed jobs
id, jtype, jargs_raw = job.values_at(:id, :job_type, :args)
job_info = "jobid=#{id} '#{jtype}:#{jargs_raw}'"
$stderr.puts "Not processing domain=#{domain_id} because a failed job exists #{job_info}"
throw :stop_process
end
dispatch(job)
}
end
end
def procline(line)
$0 = "dnsworker-#{line}"
end
def initialize(mysql_cfg)
my_cfg = mysql_cfg.merge(reconnect: true, symbolize_keys: true)
@client = Mysql2::Client.new(my_cfg)
end
def dispatch(job)
id, jtype, jargs_raw = job.values_at(:id, :job_type, :args)
jargs = JSON.parse(jargs_raw, symbolize_names: true)
job_info = "jobid=#{id} '#{jtype}:#{jargs_raw}'"
procline("working on #{job_info}")
$stderr.puts "working on #{job_info}"
send(jtype, jargs) unless dry_run
mark_done(id)
rescue Retry
mark_retry(id)
throw :stop_process
rescue CmdFailed
mark_fail(id)
throw :stop_process
rescue JobFailed
mark_fail(id)
throw :stop_process
end
def mark_done(id)
$stderr.puts :done
client.query("update jobs set status=1 where id=#{id}") unless dry_run
end
def mark_fail(id)
$stderr.puts :fail
client.query("update jobs set status=2 where id=#{id}") unless dry_run
end
def mark_retry(id)
$stderr.puts :retry
client.query("update jobs set retries = retries + 1 where id=#{id}") unless dry_run
end
def cmd(command)
out, err, status = Open3.capture3(command)
raise CmdFailed if !status.success?
[out, err]
end
end
Event Timeline
Log In to Comment