From 0c6df7be4f195fbd0fc477dc89a43f1b4d4f950c Mon Sep 17 00:00:00 2001 From: Sven Krieger <37476281+svkrieger@users.noreply.github.com> Date: Tue, 30 Jun 2026 08:31:13 +0200 Subject: [PATCH] Drain local workers by working off remaining jobs on SIGTERM Local workers handle jobs that require local filesystem access (package, droplet, and buildpack uploads). When a local worker receives SIGTERM during drain, the default delayed_job handler calls stop(), which exits after the current job but leaves remaining queued jobs unprocessed until the worker restarts. This adds LocalWorkerDrainPlugin, which replaces the SIGTERM handler for local workers only. Instead of stopping, the worker sets exit_on_complete so it finishes all remaining jobs in the queue before exiting, avoiding dangling PROCESSING_UPLOAD/PENDING states across restarts. The plugin is a no-op for generic workers and named clock queues, which retain the default SIGTERM behaviour. --- app/jobs/queues.rb | 4 + lib/delayed_job/local_worker_drain_plugin.rb | 21 +++++ lib/tasks/jobs.rake | 1 + spec/unit/jobs/queues_spec.rb | 19 +++++ .../local_worker_drain_plugin_spec.rb | 76 +++++++++++++++++++ 5 files changed, 121 insertions(+) create mode 100644 lib/delayed_job/local_worker_drain_plugin.rb create mode 100644 spec/unit/lib/delayed_job/local_worker_drain_plugin_spec.rb diff --git a/app/jobs/queues.rb b/app/jobs/queues.rb index cc7df33c60f..2fa144fa1a3 100644 --- a/app/jobs/queues.rb +++ b/app/jobs/queues.rb @@ -12,6 +12,10 @@ def self.local(config) def self.generic 'cc-generic' end + + def self.local?(queue_name) + queue_name.start_with?('cc-') && queue_name != 'cc-generic' + end end end end diff --git a/lib/delayed_job/local_worker_drain_plugin.rb b/lib/delayed_job/local_worker_drain_plugin.rb new file mode 100644 index 00000000000..1b951cfcac5 --- /dev/null +++ b/lib/delayed_job/local_worker_drain_plugin.rb @@ -0,0 +1,21 @@ +require 'jobs/queues' + +# Local workers process jobs that require access to local filesystem resources (e.g. buildpack, droplet, +# and package uploads via nginx). The 'delayed_job' gem's default SIGTERM handler calls stop(), which +# exits the worker after the current job finishes - leaving the rest of the queue unprocessed. +# This plugin replaces the gem's SIGTERM handler for local workers so that all remaining jobs in the +# queue are worked off before the worker exits, ensuring no jobs are left dangling when draining. +class LocalWorkerDrainPlugin < Delayed::Plugin + callbacks do |lifecycle| + lifecycle.before(:execute) do |worker| + next unless worker.class.queues.length == 1 && VCAP::CloudController::Jobs::Queues.local?(worker.class.queues.first) + + trap('TERM') do + Thread.new { worker.say 'Exiting...' } + worker.class.exit_on_complete = true + end + end + end +end + +Delayed::Worker.plugins << LocalWorkerDrainPlugin diff --git a/lib/tasks/jobs.rake b/lib/tasks/jobs.rake index df59d9a48eb..c408bacec2e 100644 --- a/lib/tasks/jobs.rake +++ b/lib/tasks/jobs.rake @@ -1,4 +1,5 @@ require 'delayed_job/quit_trap' +require 'delayed_job/local_worker_drain_plugin' require 'delayed_job/delayed_worker' namespace :jobs do diff --git a/spec/unit/jobs/queues_spec.rb b/spec/unit/jobs/queues_spec.rb index ded3e5c6c7e..cafe2e34ab0 100644 --- a/spec/unit/jobs/queues_spec.rb +++ b/spec/unit/jobs/queues_spec.rb @@ -38,6 +38,25 @@ module Jobs expect(Queues.generic).to eq('cc-generic') end end + + describe '.local?' do + it 'returns true for a local queue name' do + expect(Queues.local?('cc-some-host')).to be(true) + end + + it 'returns true for a local queue name with index' do + expect(Queues.local?('cc-cloud_controller_ng-0')).to be(true) + end + + it 'returns false for cc-generic' do + expect(Queues.local?('cc-generic')).to be(false) + end + + it 'returns false for named clock queues' do + expect(Queues.local?('app_usage_events')).to be(false) + expect(Queues.local?('pending_builds')).to be(false) + end + end end end end diff --git a/spec/unit/lib/delayed_job/local_worker_drain_plugin_spec.rb b/spec/unit/lib/delayed_job/local_worker_drain_plugin_spec.rb new file mode 100644 index 00000000000..42f396c99e5 --- /dev/null +++ b/spec/unit/lib/delayed_job/local_worker_drain_plugin_spec.rb @@ -0,0 +1,76 @@ +require 'spec_helper' +require 'delayed_job/local_worker_drain_plugin' +require 'jobs/queues' + +RSpec.describe LocalWorkerDrainPlugin do + let(:worker) { Delayed::Worker.new } + + before do + @original_queues = Delayed::Worker.queues + Delayed::Worker.exit_on_complete = false + allow(worker).to receive(:reload!) + allow(worker).to receive(:say) + end + + after do + Delayed::Worker.queues = @original_queues + Delayed::Worker.exit_on_complete = false + end + + describe 'TERM signal handling' do + context 'when the worker is processing a local queue' do + before do + Delayed::Worker.queues = ['cc-api_worker.cloud_controller_ng.0.1'] + Delayed::Worker.sleep_delay = 0 + end + + after { Delayed::Worker.sleep_delay = Delayed::Worker::DEFAULT_SLEEP_DELAY } + + it 'works off all remaining jobs in the queue before exiting' do + work_off_calls = Queue.new + allow(worker).to receive(:work_off) do + work_off_calls.push(:called) + work_off_calls.size < 3 ? [1, 0] : [0, 0] + end + + worker_thread = Thread.new { worker.start } + work_off_calls.pop # wait until worker has started + Process.kill('TERM', Process.pid) + worker_thread.join(5) + + expect(worker_thread.alive?).to be(false) + expect(work_off_calls.size).to eq(3) + end + end + + context 'when the worker is processing the generic queue' do + before do + Delayed::Worker.queues = ['cc-generic'] + allow(worker).to receive(:trap).with('QUIT') + allow(worker).to receive(:trap).with('INT') + allow(worker).to receive(:trap).with('TERM').and_yield + allow(worker).to receive_messages(work_off: [0, 0], stop?: true) + end + + it 'does not set exit_on_complete' do + worker.start + expect(Delayed::Worker.exit_on_complete).to be(false) + end + end + + context 'when the worker is processing a named clock queue' do + before do + Delayed::Worker.queues = ['app_usage_events'] + allow(worker).to receive(:trap).with('QUIT') + allow(worker).to receive(:trap).with('INT') + allow(worker).to receive(:trap).with('TERM').and_yield + allow(worker).to receive_messages(work_off: [0, 0], stop?: true) + end + + it 'does not set exit_on_complete' do + worker.start + expect(Delayed::Worker.exit_on_complete).to be(false) + end + end + end +end