diff --git a/app/jobs/queues.rb b/app/jobs/queues.rb index cc7df33c60f..2fa144fa1a3 100644 --- a/app/jobs/queues.rb +++ b/app/jobs/queues.rb @@ -12,6 +12,10 @@ def self.local(config) def self.generic 'cc-generic' end + + def self.local?(queue_name) + queue_name.start_with?('cc-') && queue_name != 'cc-generic' + end end end end diff --git a/lib/delayed_job/local_worker_drain_plugin.rb b/lib/delayed_job/local_worker_drain_plugin.rb new file mode 100644 index 00000000000..1b951cfcac5 --- /dev/null +++ b/lib/delayed_job/local_worker_drain_plugin.rb @@ -0,0 +1,21 @@ +require 'jobs/queues' + +# Local workers process jobs that require access to local filesystem resources (e.g. buildpack, droplet, +# and package uploads via nginx). The 'delayed_job' gem's default SIGTERM handler calls stop(), which +# exits the worker after the current job finishes - leaving the rest of the queue unprocessed. +# This plugin replaces the gem's SIGTERM handler for local workers so that all remaining jobs in the +# queue are worked off before the worker exits, ensuring no jobs are left dangling when draining. +class LocalWorkerDrainPlugin < Delayed::Plugin + callbacks do |lifecycle| + lifecycle.before(:execute) do |worker| + next unless worker.class.queues.length == 1 && VCAP::CloudController::Jobs::Queues.local?(worker.class.queues.first) + + trap('TERM') do + Thread.new { worker.say 'Exiting...' } + worker.class.exit_on_complete = true + end + end + end +end + +Delayed::Worker.plugins << LocalWorkerDrainPlugin diff --git a/lib/tasks/jobs.rake b/lib/tasks/jobs.rake index df59d9a48eb..c408bacec2e 100644 --- a/lib/tasks/jobs.rake +++ b/lib/tasks/jobs.rake @@ -1,4 +1,5 @@ require 'delayed_job/quit_trap' +require 'delayed_job/local_worker_drain_plugin' require 'delayed_job/delayed_worker' namespace :jobs do diff --git a/spec/unit/jobs/queues_spec.rb b/spec/unit/jobs/queues_spec.rb index ded3e5c6c7e..cafe2e34ab0 100644 --- a/spec/unit/jobs/queues_spec.rb +++ b/spec/unit/jobs/queues_spec.rb @@ -38,6 +38,25 @@ module Jobs expect(Queues.generic).to eq('cc-generic') end end + + describe '.local?' do + it 'returns true for a local queue name' do + expect(Queues.local?('cc-some-host')).to be(true) + end + + it 'returns true for a local queue name with index' do + expect(Queues.local?('cc-cloud_controller_ng-0')).to be(true) + end + + it 'returns false for cc-generic' do + expect(Queues.local?('cc-generic')).to be(false) + end + + it 'returns false for named clock queues' do + expect(Queues.local?('app_usage_events')).to be(false) + expect(Queues.local?('pending_builds')).to be(false) + end + end end end end diff --git a/spec/unit/lib/delayed_job/local_worker_drain_plugin_spec.rb b/spec/unit/lib/delayed_job/local_worker_drain_plugin_spec.rb new file mode 100644 index 00000000000..42f396c99e5 --- /dev/null +++ b/spec/unit/lib/delayed_job/local_worker_drain_plugin_spec.rb @@ -0,0 +1,76 @@ +require 'spec_helper' +require 'delayed_job/local_worker_drain_plugin' +require 'jobs/queues' + +RSpec.describe LocalWorkerDrainPlugin do + let(:worker) { Delayed::Worker.new } + + before do + @original_queues = Delayed::Worker.queues + Delayed::Worker.exit_on_complete = false + allow(worker).to receive(:reload!) + allow(worker).to receive(:say) + end + + after do + Delayed::Worker.queues = @original_queues + Delayed::Worker.exit_on_complete = false + end + + describe 'TERM signal handling' do + context 'when the worker is processing a local queue' do + before do + Delayed::Worker.queues = ['cc-api_worker.cloud_controller_ng.0.1'] + Delayed::Worker.sleep_delay = 0 + end + + after { Delayed::Worker.sleep_delay = Delayed::Worker::DEFAULT_SLEEP_DELAY } + + it 'works off all remaining jobs in the queue before exiting' do + work_off_calls = Queue.new + allow(worker).to receive(:work_off) do + work_off_calls.push(:called) + work_off_calls.size < 3 ? [1, 0] : [0, 0] + end + + worker_thread = Thread.new { worker.start } + work_off_calls.pop # wait until worker has started + Process.kill('TERM', Process.pid) + worker_thread.join(5) + + expect(worker_thread.alive?).to be(false) + expect(work_off_calls.size).to eq(3) + end + end + + context 'when the worker is processing the generic queue' do + before do + Delayed::Worker.queues = ['cc-generic'] + allow(worker).to receive(:trap).with('QUIT') + allow(worker).to receive(:trap).with('INT') + allow(worker).to receive(:trap).with('TERM').and_yield + allow(worker).to receive_messages(work_off: [0, 0], stop?: true) + end + + it 'does not set exit_on_complete' do + worker.start + expect(Delayed::Worker.exit_on_complete).to be(false) + end + end + + context 'when the worker is processing a named clock queue' do + before do + Delayed::Worker.queues = ['app_usage_events'] + allow(worker).to receive(:trap).with('QUIT') + allow(worker).to receive(:trap).with('INT') + allow(worker).to receive(:trap).with('TERM').and_yield + allow(worker).to receive_messages(work_off: [0, 0], stop?: true) + end + + it 'does not set exit_on_complete' do + worker.start + expect(Delayed::Worker.exit_on_complete).to be(false) + end + end + end +end