Skip to content

Commit

Permalink
TimerTask: kill the execution when it exceeds the task timeout and ad…
Browse files Browse the repository at this point in the history
…d timer_task timeout specs

Before this commit when a TimerTask task exceeded the timeout the job
kept running. That could lead to thread leaking. Related to ruby-concurrency#639

This PR changes the TimerTask executor to be a SingleThreadExecutor. So
whenever the task doesn't finish in time, the execution thread is killed
(in this case the pool is killed)

The spec ensures that this new behavior is working correctly. It fails
on master. If the main job isn't killed after the timeout, the
latch.wait(2) would raise, since the main task sleeps for 5 seconds.

Closes ruby-concurrency#639
  • Loading branch information
rubemz committed Jul 13, 2018
1 parent aedc0cd commit c772e76
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 19 deletions.
26 changes: 22 additions & 4 deletions lib/concurrent/timer_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ def ns_initialize(opts, &task)
self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
@run_now = opts[:now] || opts[:run_now]
@executor = Concurrent::SafeTaskExecutor.new(task)
@task = task
@executor = Concurrent::RubySingleThreadExecutor.new()
@running = Concurrent::AtomicBoolean.new(false)
@value = nil

Expand Down Expand Up @@ -309,13 +310,26 @@ def schedule_next_task(interval = execution_interval)
def execute_task(completion)
return nil unless @running.true?
ScheduledTask.execute(timeout_interval, args: [completion], &method(:timeout_task))
_success, value, reason = @executor.execute(self)
@thread_completed = Concurrent::Event.new
@value = @reason = nil

@executor.post do
begin
@value = @task.call(self)
rescue Exception => ex
@reason = ex
ensure
@thread_completed.set
end
end

@thread_completed.wait

if completion.try?
self.value = value
schedule_next_task
time = Time.now
observers.notify_observers do
[time, self.value, reason]
[time, self.value, @reason]
end
end
nil
Expand All @@ -325,6 +339,10 @@ def execute_task(completion)
def timeout_task(completion)
return unless @running.true?
if completion.try?
@executor.kill
@executor.wait_for_termination
@executor = Concurrent::RubySingleThreadExecutor.new()
@thread_completed.set
self.value = value
schedule_next_task
observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
Expand Down
56 changes: 41 additions & 15 deletions spec/concurrent/timer_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@
module Concurrent

RSpec.describe TimerTask do
let(:observer) do
Class.new do
attr_reader :time
attr_reader :value
attr_reader :ex
attr_accessor :latch
define_method(:initialize) { @latch = CountDownLatch.new(1) }
define_method(:update) do |time, value, ex|
@time = time
@value = value
@ex = ex
@latch.count_down
end
end.new
end

context :dereferenceable do

Expand Down Expand Up @@ -212,26 +227,37 @@ def trigger_observable(observable)
expect(expected).to eq subject
subject.kill
end
end

context 'observation' do
context "timeout" do
it 'should not timeout' do
subject = TimerTask.new(execution: 0.1, timeout: 1) { sleep(0.5); 42 }
subject.add_observer(observer)
subject.execute
observer.latch.wait(2)
expect(observer.ex).to be_nil
expect(observer.value).to eq(42)
subject.kill
end

let(:observer) do
Class.new do
attr_reader :time
attr_reader :value
attr_reader :ex
attr_reader :latch
define_method(:initialize) { @latch = CountDownLatch.new(1) }
define_method(:update) do |time, value, ex|
@time = time
@value = value
@ex = ex
@latch.count_down
it 'times out and kills the current task' do
observer.latch = CountDownLatch.new(2)
subject = TimerTask.new(execution: 0.1, timeout: 0.5, run_now: true) do
sleep(5) if observer.latch.count == 2
42
end
end.new
subject.add_observer(observer)

subject.execute
observer.latch.wait(2)

expect(observer.ex).to be_nil
expect(observer.value).to eq(42)
subject.kill
end
end
end

context 'observation' do
it 'notifies all observers on success' do
subject = TimerTask.new(execution: 0.1) { 42 }
subject.add_observer(observer)
Expand Down

0 comments on commit c772e76

Please sign in to comment.