Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/jobs/queues.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ def self.local(config)
def self.generic
'cc-generic'
end

def self.local?(queue_name)
queue_name.start_with?('cc-') && queue_name != 'cc-generic'
end
end
end
end
21 changes: 21 additions & 0 deletions lib/delayed_job/local_worker_drain_plugin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require 'jobs/queues'

# Local workers process jobs that require access to local filesystem resources (e.g. buildpack, droplet,
# and package uploads via nginx). The 'delayed_job' gem's default SIGTERM handler calls stop(), which
# exits the worker after the current job finishes - leaving the rest of the queue unprocessed.
# This plugin replaces the gem's SIGTERM handler for local workers so that all remaining jobs in the
# queue are worked off before the worker exits, ensuring no jobs are left dangling when draining.
class LocalWorkerDrainPlugin < Delayed::Plugin
callbacks do |lifecycle|
lifecycle.before(:execute) do |worker|
next unless worker.class.queues.length == 1 && VCAP::CloudController::Jobs::Queues.local?(worker.class.queues.first)

trap('TERM') do
Thread.new { worker.say 'Exiting...' }
worker.class.exit_on_complete = true
end
end
end
end

Delayed::Worker.plugins << LocalWorkerDrainPlugin
1 change: 1 addition & 0 deletions lib/tasks/jobs.rake
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'delayed_job/quit_trap'
require 'delayed_job/local_worker_drain_plugin'
require 'delayed_job/delayed_worker'

namespace :jobs do
Expand Down
19 changes: 19 additions & 0 deletions spec/unit/jobs/queues_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@ module Jobs
expect(Queues.generic).to eq('cc-generic')
end
end

describe '.local?' do
it 'returns true for a local queue name' do
expect(Queues.local?('cc-some-host')).to be(true)
end

it 'returns true for a local queue name with index' do
expect(Queues.local?('cc-cloud_controller_ng-0')).to be(true)
end

it 'returns false for cc-generic' do
expect(Queues.local?('cc-generic')).to be(false)
end

it 'returns false for named clock queues' do
expect(Queues.local?('app_usage_events')).to be(false)
expect(Queues.local?('pending_builds')).to be(false)
end
end
end
end
end
76 changes: 76 additions & 0 deletions spec/unit/lib/delayed_job/local_worker_drain_plugin_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
require 'spec_helper'
require 'delayed_job/local_worker_drain_plugin'
require 'jobs/queues'

RSpec.describe LocalWorkerDrainPlugin do
let(:worker) { Delayed::Worker.new }

before do
@original_queues = Delayed::Worker.queues
Delayed::Worker.exit_on_complete = false
allow(worker).to receive(:reload!)
allow(worker).to receive(:say)
end

after do
Delayed::Worker.queues = @original_queues
Delayed::Worker.exit_on_complete = false
end

describe 'TERM signal handling' do
context 'when the worker is processing a local queue' do
before do
Delayed::Worker.queues = ['cc-api_worker.cloud_controller_ng.0.1']
Delayed::Worker.sleep_delay = 0
end

after { Delayed::Worker.sleep_delay = Delayed::Worker::DEFAULT_SLEEP_DELAY }

it 'works off all remaining jobs in the queue before exiting' do
work_off_calls = Queue.new
allow(worker).to receive(:work_off) do
work_off_calls.push(:called)
work_off_calls.size < 3 ? [1, 0] : [0, 0]
end

worker_thread = Thread.new { worker.start }
work_off_calls.pop # wait until worker has started
Process.kill('TERM', Process.pid)
worker_thread.join(5)

expect(worker_thread.alive?).to be(false)
expect(work_off_calls.size).to eq(3)
end
end

context 'when the worker is processing the generic queue' do
before do
Delayed::Worker.queues = ['cc-generic']
allow(worker).to receive(:trap).with('QUIT')
allow(worker).to receive(:trap).with('INT')
allow(worker).to receive(:trap).with('TERM').and_yield
allow(worker).to receive_messages(work_off: [0, 0], stop?: true)
end

it 'does not set exit_on_complete' do
worker.start
expect(Delayed::Worker.exit_on_complete).to be(false)
end
end

context 'when the worker is processing a named clock queue' do
before do
Delayed::Worker.queues = ['app_usage_events']
allow(worker).to receive(:trap).with('QUIT')
allow(worker).to receive(:trap).with('INT')
allow(worker).to receive(:trap).with('TERM').and_yield
allow(worker).to receive_messages(work_off: [0, 0], stop?: true)
end

it 'does not set exit_on_complete' do
worker.start
expect(Delayed::Worker.exit_on_complete).to be(false)
end
end
end
end
Loading