Skip to content

Commit

Permalink
Stop queuing up heartbeat threads
Browse files Browse the repository at this point in the history
Fix #338
  • Loading branch information
Pablo Cantero committed Mar 24, 2017
1 parent 075c866 commit ddc901a
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ def initialize(fetcher, polling_strategy)

@heartbeat = Concurrent::TimerTask.new(run_now: true,
execution_interval: HEARTBEAT_INTERVAL,
timeout_interval: 60) { dispatch }
timeout_interval: 60) { @pool.post { dispatch } if @dispatching.false? }

Concurrent::TimerTask.new(execution_interval: 1) do
Shoryuken.logger.info "Threads: #{Thread.list.size}"
end.execute

@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count)
end
Expand Down Expand Up @@ -60,16 +64,19 @@ def processor_done(queue)

def dispatch
return if @done.true?
return unless @dispatching.make_true

return if ready.zero?
return unless (queue = @polling_strategy.next_queue)
begin
return unless @dispatching.make_true

return if ready.zero?
return unless (queue = @polling_strategy.next_queue)

logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }
logger.debug { "Ready: #{ready}, Busy: #{busy}, Active Queues: #{@polling_strategy.active_queues}" }

batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
ensure
@dispatching.make_false
batched_queue?(queue) ? dispatch_batch(queue) : dispatch_single_messages(queue)
ensure
@dispatching.make_false
end
end

def busy
Expand Down

0 comments on commit ddc901a

Please sign in to comment.