Skip to content

Commit

Permalink
Introduce poll-interval-variance option
Browse files Browse the repository at this point in the history
  • Loading branch information
tomgi committed Oct 28, 2024
1 parent 51403c0 commit 5e50c41
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 63 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ group :test do

gem 'pry'
gem 'pg_examiner', '~> 0.5.2'

gem 'timecop', '~> 0.9.10'
end

gemspec
29 changes: 20 additions & 9 deletions lib/que/command_line_interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ def parse(
default_require_file: RAILS_ENVIRONMENT_FILE
)

options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
connection_url = nil
worker_count = nil
worker_priorities = nil
options = {}
queues = []
log_level = 'info'
log_internals = false
poll_interval = 5
poll_interval_variance = 0
connection_url = nil
worker_count = nil
worker_priorities = nil

parser =
OptionParser.new do |opts|
Expand All @@ -50,6 +51,15 @@ def parse(
poll_interval = i
end

opts.on(
'-j',
'--poll-interval-variance [INTERVAL]',
Float,
"Set maximum variance in poll interval, in seconds (default: 0)",
) do |j|
poll_interval_variance = j.to_f
end

opts.on(
'--listen [LISTEN]',
String,
Expand Down Expand Up @@ -232,7 +242,8 @@ def parse(
options[:queues] = queues_hash
end

options[:poll_interval] = poll_interval
options[:poll_interval] = poll_interval
options[:poll_interval_variance] = poll_interval_variance

locker =
begin
Expand Down
56 changes: 31 additions & 25 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class << self
}

class Locker
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval
attr_reader :thread, :workers, :job_buffer, :locks, :queues, :poll_interval, :poll_interval_variance

MESSAGE_RESOLVERS = {}
RESULT_RESOLVERS = {}
Expand All @@ -47,22 +47,24 @@ class Locker
RESULT_RESOLVERS[:job_finished] =
-> (messages) { finish_jobs(messages.map{|m| m.fetch(:metajob)}) }

DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze
DEFAULT_POLL_INTERVAL = 5.0
DEFAULT_POLL_INTERVAL_VARIANCE = 0.0
DEFAULT_WAIT_PERIOD = 50
DEFAULT_MAXIMUM_BUFFER_SIZE = 8
DEFAULT_WORKER_PRIORITIES = [10, 30, 50, nil, nil, nil].freeze

def initialize(
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
queues: [Que.default_queue],
connection_url: nil,
listen: true,
poll: true,
poll_interval: DEFAULT_POLL_INTERVAL,
poll_interval_variance: DEFAULT_POLL_INTERVAL_VARIANCE,
wait_period: DEFAULT_WAIT_PERIOD,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
worker_priorities: DEFAULT_WORKER_PRIORITIES,
on_worker_start: nil,
pidfile: nil
)

# Sanity-check all our arguments, since some users may instantiate Locker
Expand All @@ -71,6 +73,7 @@ def initialize(
Que.assert [TrueClass, FalseClass], poll

Que.assert Numeric, poll_interval
Que.assert Numeric, poll_interval_variance
Que.assert Numeric, wait_period

Que.assert Array, worker_priorities
Expand All @@ -94,20 +97,22 @@ def initialize(

Que.internal_log :locker_instantiate, self do
{
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
queues: queues,
listen: listen,
poll: poll,
poll_interval: poll_interval,
poll_interval_variance: poll_interval_variance,
wait_period: wait_period,
maximum_buffer_size: maximum_buffer_size,
worker_priorities: worker_priorities,
}
end

# Local cache of which advisory locks are held by this connection.
@locks = Set.new

@poll_interval = poll_interval
@poll_interval_variance = poll_interval_variance

if queues.is_a?(Hash)
@queue_names = queues.keys
Expand Down Expand Up @@ -204,9 +209,10 @@ def initialize(
if poll
@queues.map do |queue_name, interval|
Poller.new(
connection: @connection,
queue: queue_name,
poll_interval: interval,
connection: @connection,
queue: queue_name,
poll_interval: interval,
poll_interval_variance: poll_interval_variance,
)
end
end
Expand Down
50 changes: 31 additions & 19 deletions lib/que/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,32 @@ class Poller
:connection,
:queue,
:poll_interval,
:poll_interval_variance,
:last_polled_at,
:last_poll_satisfied
:last_poll_satisfied,
:next_poll_at

def initialize(
connection:,
queue:,
poll_interval:
poll_interval:,
poll_interval_variance:
)
@connection = connection
@queue = queue
@poll_interval = poll_interval
@connection = connection
@queue = queue
@poll_interval = poll_interval
@poll_interval_variance = poll_interval_variance

@last_polled_at = nil
@last_poll_satisfied = nil
@next_poll_at = Time.now

Que.internal_log :poller_instantiate, self do
{
backend_pid: connection.backend_pid,
queue: queue,
poll_interval: poll_interval,
backend_pid: connection.backend_pid,
queue: queue,
poll_interval: poll_interval,
poll_interval_variance: poll_interval_variance,
}
end
end
Expand All @@ -158,31 +165,36 @@ def poll(

@last_polled_at = Time.now
@last_poll_satisfied = poll_satisfied?(priorities, jobs)
@next_poll_at = last_polled_at +
poll_interval +
rand(-poll_interval_variance..poll_interval_variance)

Que.internal_log :poller_polled, self do
{
queue: @queue,
locked: jobs.count,
priorities: priorities,
held_locks: held_locks.to_a,
newly_locked: jobs.map { |key| key.fetch(:id) },
queue: @queue,
locked: jobs.count,
priorities: priorities,
held_locks: held_locks.to_a,
newly_locked: jobs.map { |key| key.fetch(:id) },
last_polled_at: last_polled_at,
last_poll_satisfied: last_poll_satisfied,
next_poll_at: next_poll_at,
}
end

jobs.map! { |job| Metajob.new(job) }
end

def should_poll?
# polling is disabled for this queue
return false if poll_interval.nil?

# Never polled before?
last_poll_satisfied.nil? ||
# Plenty of jobs were available last time?
last_poll_satisfied == true ||
poll_interval_elapsed?
end

def poll_interval_elapsed?
return unless interval = poll_interval
(Time.now - last_polled_at) > interval
# It's due time to poll again regardless of the last poll's results?
next_poll_at < Time.now
end

class << self
Expand Down
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-6.0
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-6.1
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-7.0
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
1 change: 1 addition & 0 deletions spec/gemfiles/Gemfile-rails-7.1
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem 'minitest-hooks', '1.4.0'
gem 'pry'
gem 'pg_examiner', '~> 0.5.2'
gem 'timecop', '~> 0.9.10'
end
24 changes: 17 additions & 7 deletions spec/que/command_line_interface_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def write_file
def assert_locker_instantiated(
worker_priorities: [10, 30, 50, nil, nil, nil],
poll_interval: 5,
poll_interval_variance: 0.0,
listen: true,
wait_period: 50,
queues: ['default'],
Expand All @@ -199,13 +200,14 @@ def assert_locker_instantiated(

locker_instantiate = locker_instantiates.first

assert_equal listen, locker_instantiate[:listen]
assert_equal true, locker_instantiate[:poll]
assert_equal queues, locker_instantiate[:queues]
assert_equal poll_interval, locker_instantiate[:poll_interval]
assert_equal wait_period, locker_instantiate[:wait_period]
assert_equal maximum_buffer_size, locker_instantiate[:maximum_buffer_size]
assert_equal worker_priorities, locker_instantiate[:worker_priorities]
assert_equal listen, locker_instantiate[:listen]
assert_equal true, locker_instantiate[:poll]
assert_equal queues, locker_instantiate[:queues]
assert_equal poll_interval, locker_instantiate[:poll_interval]
assert_equal poll_interval_variance, locker_instantiate[:poll_interval_variance]
assert_equal wait_period, locker_instantiate[:wait_period]
assert_equal maximum_buffer_size, locker_instantiate[:maximum_buffer_size]
assert_equal worker_priorities, locker_instantiate[:worker_priorities]
end

def assert_locker_started(
Expand Down Expand Up @@ -258,6 +260,14 @@ def assert_locker_started(
end
end

["-j", "--poll-interval-variance"].each do |command|
it "with #{command} to configure the poll interval variance" do
assert_successful_invocation "./#{filename} #{command} 5"
assert_locker_instantiated(poll_interval_variance: 5)
assert_locker_started
end
end

it "with --listen false to disable listen mode" do
assert_successful_invocation "./#{filename} --listen false"
assert_locker_instantiated(listen: false)
Expand Down
9 changes: 6 additions & 3 deletions spec/que/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def poll(
connection: override_connection || connection,
queue: queue_name,
poll_interval: 5,
poll_interval_variance: 0,
)

Que::Poller.setup(override_connection || connection)
Expand Down Expand Up @@ -245,6 +246,7 @@ def assert_poll(priorities:, locked:)
connection: connection,
queue: 'default',
poll_interval: 5,
poll_interval_variance: 0,
)
end

Expand Down Expand Up @@ -275,14 +277,15 @@ def assert_poll(priorities:, locked:)
assert_equal false, poller.should_poll?
end

it "should be false if the number of jobs returned from the last poll was less than the lowest priority request, but the poll_interval has elapsed" do
it "should be true if the number of jobs returned from the last poll was less than the lowest priority request, but the poll_interval has elapsed" do
job_ids = 5.times.map { Que::Job.enqueue.que_attrs[:id] }

result = poller.poll(priorities: { 500 => 7 }, held_locks: Set.new)
assert_equal job_ids, result.map(&:id)

poller.instance_variable_set(:@last_polled_at, Time.now - 30)
assert_equal true, poller.should_poll?
Timecop.freeze(Time.now + 30) do
assert_equal true, poller.should_poll?
end
end
end
end
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
require 'minitest/hooks'
require 'minitest/profile'

# "time travel" capabilities.
require 'timecop'
Timecop.safe_mode = true

# Other support stuff.
Dir['./spec/support/**/*.rb'].sort.each(&method(:require))

Expand Down

0 comments on commit 5e50c41

Please sign in to comment.