Skip to content

Commit

Permalink
Reflections (#611)
Browse files Browse the repository at this point in the history
* Replace logging with reflections

Everyone logs differently, and everything is different for everyone. There for, give the power back to the people and return the job item that has all the valuable information.

This will also allow people to extract metrics and such

* Add reflections to README

* Adds more reflections

* Adds test coverage

* Mandatory rubocop commit
  • Loading branch information
mhenrixon authored Jun 27, 2021
1 parent f4f4ce3 commit 8c8d54c
Show file tree
Hide file tree
Showing 36 changed files with 631 additions and 259 deletions.
3 changes: 3 additions & 0 deletions .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ detectors:
exclude:
- Sidekiq::JobSet::UniqueExtension#delete_by_value
- Sidekiq::ScheduledSet::UniqueExtension#delete
- SidekiqUniqueJobs::Lock::BaseLock#call_strategy
- SidekiqUniqueJobs::Orphans::Manager#start
- SidekiqUniqueJobs::Orphans::RubyReaper#active?
- SidekiqUniqueJobs::Redis::Hash#entries
Expand All @@ -31,6 +32,7 @@ detectors:
exclude:
- Sidekiq#self.use_options
- SidekiqUniqueJobs#toggle
- SidekiqUniqueJobs::Deprecation#self.muted
- SidekiqUniqueJobs::Lock#del
- SidekiqUniqueJobs::Lock#lock
- SidekiqUniqueJobs::Lock::WhileExecuting#execute
Expand Down Expand Up @@ -125,6 +127,7 @@ detectors:
- SidekiqUniqueJobs::Digests#entries
- SidekiqUniqueJobs::Digests#page
- SidekiqUniqueJobs::Lock#lock
- SidekiqUniqueJobs::Lock::BaseLock#call_strategy
- SidekiqUniqueJobs::Lock::WhileExecuting#execute
- SidekiqUniqueJobs::LockArgs#filtered_args
- SidekiqUniqueJobs::LockInfo#set
Expand Down
117 changes: 92 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@
- [Support Me](#support-me)
- [Requirements](#requirements)
- [General Information](#general-information)
- [Reflections \(metrics, logging, etc.\)](#reflections-metrics-logging-etc)
- [after_unlock_callback_failed](#after_unlock_callback_failed)
- [error](#error)
- [execution_failed](#execution_failed)
- [lock_failed](#lock_failed)
- [locked](#locked)
- [reschedule_failed](#reschedule_failed)
- [rescheduled](#rescheduled)
- [timeout](#timeout)
- [unlock_failed](#unlock_failed)
- [unlocked](#unlocked)
- [unknown_sidekiq_worker](#unknown_sidekiq_worker)
- [Global Configuration](#global-configuration)
- [debug_lua](#debug_lua)
- [lock_timeout](#lock_timeout)
Expand Down Expand Up @@ -49,7 +61,6 @@
- [Usage](#usage-1)
- [Finer Control over Uniqueness](#finer-control-over-uniqueness)
- [After Unlock Callback](#after-unlock-callback)
- [Logging](#logging)
- [Cleanup Dead Locks](#cleanup-dead-locks)
- [Other Sidekiq gems](#other-sidekiq-gems)
- [apartment-sidekiq](#apartment-sidekiq)
Expand Down Expand Up @@ -130,28 +141,23 @@ end

### Your first worker

The most likely to be used worker is `:until_executed`. This type of lock creates a lock from when `UntilExecutedWorker.perform_async` is called until the the sidekiq processor has processed the job.

```ruby
# frozen_string_literal: true

class UntilExecutedWorker
include Sidekiq::Worker

sidekiq_options queue: :special,
retry: false,
lock: :until_executed,
lock_info: true,
lock_timeout: 0,
lock_prefix: "special",
lock_ttl: 0,
lock_limit: 5
sidekiq_options queue: :until_executed,
lock: :until_executed

def perform
logger.info("cowboy")
sleep(1) # hardcore processing
logger.info("beebop")
end
end

```

You can read more about the worker configuration in [Worker Configuration](#worker-configuration) below.
Expand Down Expand Up @@ -179,6 +185,82 @@ See [Interaction w/ Sidekiq](https://github.com/mhenrixon/sidekiq-unique-jobs/wi

See [Locking & Unlocking](https://github.com/mhenrixon/sidekiq-unique-jobs/wiki/Locking-&-Unlocking) for an overview of the differences on when the various lock types are locked and unlocked.

## Reflections (metrics, logging, etc.)

To be able to gather some insights on what is going on inside this gem. I provide a reflection API that can be used.

To setup reflections for logging or metrics, use the following API:

```ruby

def extract_log_from_job(message, item)
worker = item['class']
args = item['args']
lock_args = item['lock_args']
queue = item['queue']
{
message: message,
worker: worker,
args: args,
lock_args: lock_args,
queue: queue
}
end

SidekiqUniqueJobs.reflect do |on|
on.lock_failed do |item|
message = extract_log_from_job('Lock Failed', item)
Sidekiq.logger.warn(message)
end
end
```

### after_unlock_callback_failed

This is called when you have configured a custom callback for when a lock has been released.

### error

Not in use yet but will be used deep into the stack to provide a means to catch and report errors inside the gem.

### execution_failed

When the sidekiq processor picks the job of the queue for certain jobs but your job raised an error to the middleware. This will be the reflection. It is probably nothing to worry about. When your worker raises an error, we need to handle some edge cases for until and while executing.

### lock_failed

If we can't achieve a lock, this will be the reflection. It most likely is nothing to worry about. We just couldn't retrieve a lock in a timely fashion.

The biggest reason for this reflection would be to gather metrics on which workers fail the most at the locking step for example.

### locked

For when a lock has been successful. Again, mostly useful for metrics I suppose.

### reschedule_failed

For when the reschedule strategy failed to reschedule the job.

### rescheduled

For when a job was successfully rescheduled

### timeout

This is also mostly useful for reporting/metrics purposes. What this reflection does is signal that the job was configured to wait (`lock_timeout` was configured), but we couldn't retrieve a lock even though we waited for some time.

### unlock_failed

This is not got, this is worth

### unlocked

Also mostly useful for reporting purposes. The job was successfully unlocked.

### unknown_sidekiq_worker

The reason this happens is that the server couldn't find a valid sidekiq worker class. Most likely, that worker isn't intended to be processed by this sidekiq server instance.

## Global Configuration

The gem supports a few different configuration options that might be of interest if you run into some weird issues.
Expand Down Expand Up @@ -683,21 +765,6 @@ class UniqueJobWithFilterMethod
end.
```

### Logging

To see logging in sidekiq when duplicate payload has been filtered out you can enable on a per worker basis using the sidekiq options. The default value is false

```ruby
class UniqueJobWithFilterMethod
include Sidekiq::Worker
sidekiq_options lock: :while_executing,
log_duplicate: true

...

end
```

### Cleanup Dead Locks

For sidekiq versions before 5.1 a `sidekiq_retries_exhausted` block is required per worker class. This is deprecated in Sidekiq 6.0
Expand Down
3 changes: 3 additions & 0 deletions lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
require "pathname"
require "sidekiq"

require "sidekiq_unique_jobs/deprecation"
require "sidekiq_unique_jobs/reflections"
require "sidekiq_unique_jobs/reflectable"
require "sidekiq_unique_jobs/timer_task"
require "sidekiq_unique_jobs/version"
require "sidekiq_unique_jobs/version_check"
Expand Down
1 change: 0 additions & 1 deletion lib/sidekiq_unique_jobs/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ module SidekiqUniqueJobs
LOCK_TIMEOUT ||= "lock_timeout"
LOCK_TTL ||= "lock_ttl"
LOCK_TYPE ||= "lock_type"
LOG_DUPLICATE ||= "log_duplicate"
ON_CLIENT_CONFLICT ||= "on_client_conflict"
ON_CONFLICT ||= "on_conflict"
ON_SERVER_CONFLICT ||= "on_server_conflict"
Expand Down
35 changes: 35 additions & 0 deletions lib/sidekiq_unique_jobs/deprecation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
#
# Class Deprecation provides logging of deprecations
#
# @author Mikael Henriksson <mikael@mhenrixon.com>
#
class Deprecation
def self.muted
orig_val = Thread.current[:uniquejobs_mute_deprecations]
Thread.current[:uniquejobs_mute_deprecations] = true
yield
ensure
Thread.current[:uniquejobs_mute_deprecations] = orig_val
end

def self.muted?
Thread.current[:uniquejobs_mute_deprecations] == true
end

def self.warn(msg)
return if SidekiqUniqueJobs::Deprecation.muted?

warn "DEPRECATION WARNING: #{msg}"
end

def self.warn_with_backtrace(msg)
return if SidekiqUniqueJobs::Deprecation.muted?

trace = "\n\nCALLED FROM:\n#{caller.join("\n")}"
warn(msg + trace)
end
end
end
9 changes: 9 additions & 0 deletions lib/sidekiq_unique_jobs/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ def initialize(item)
end
end

#
# Raised when no block was given
#
class NoBlockGiven < SidekiqUniqueJobs::UniqueJobsError; end
#
# Raised when a notification has been mistyped
#
class NoSuchNotificationError < UniqueJobsError; end

#
# Error raised when trying to add a duplicate lock
#
Expand Down
63 changes: 37 additions & 26 deletions lib/sidekiq_unique_jobs/lock/base_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ class Lock
# @abstract
# @author Mikael Henriksson <mikael@mhenrixon.com>
class BaseLock
# includes "SidekiqUniqueJobs::Logging"
# @!parse include SidekiqUniqueJobs::Logging
include SidekiqUniqueJobs::Logging

# includes "SidekiqUniqueJobs::Reflectable"
# @!parse include SidekiqUniqueJobs::Reflectable
include SidekiqUniqueJobs::Reflectable

#
# Validates that the sidekiq_options for the worker is valid
#
Expand Down Expand Up @@ -41,10 +47,8 @@ def initialize(item, callback, redis_pool = nil)
#
# @yield to the caller when given a block
#
def lock(&block)
return call_strategy unless (locked_token = locksmith.lock(&block))

locked_token
def lock
raise NotImplementedError, "##{__method__} needs to be implemented in #{self.class}"
end

# Execute the job in the Sidekiq server processor
Expand Down Expand Up @@ -90,23 +94,6 @@ def locksmith

private

def prepare_item
return if item.key?(LOCK_DIGEST)

# The below should only be done to ease testing
# in production this will be done by the middleware
SidekiqUniqueJobs::Job.prepare(item)
end

def call_strategy
@attempt += 1
client_strategy.call { lock if replace? }
end

def replace?
client_strategy.replace? && attempt < 2
end

# @!attribute [r] item
# @return [Hash<String, Object>] the Sidekiq job hash
attr_reader :item
Expand All @@ -123,18 +110,42 @@ def replace?
# @return [Integer] the current locking attempt
attr_reader :attempt

def unlock_with_callback
return log_warn("Might need to be unlocked manually", item) unless unlock
def prepare_item
return if item.key?(LOCK_DIGEST)

callback_safely
item[JID]
# The below should only be done to ease testing
# in production this will be done by the middleware
SidekiqUniqueJobs::Job.prepare(item)
end

def lock_failed
reflect(:lock_failed, item)
call_strategy(of: :client)
end

def call_strategy(of:) # rubocop:disable Naming/MethodParameterName
@attempt += 1

case of
when :client
client_strategy.call { lock if replace? }
when :server
server_strategy.call { lock if replace? }
else
raise SidekiqUniqueJobs::InvalidArgument,
"either `for: :server` or `for: :client` needs to be specified"
end
end

def replace?
client_strategy.replace? && attempt < 2
end

def callback_safely
callback&.call
item[JID]
rescue StandardError
log_warn("Unlocked successfully but the #after_unlock callback failed!", item)
reflect(:after_unlock_callback_failed, item)
raise
end

Expand Down
Loading

0 comments on commit 8c8d54c

Please sign in to comment.