Skip to content

Commit

Permalink
Propagate trace through Concurrent::Promises.future (#1522)
Browse files Browse the repository at this point in the history
* Propagate trace through Concurrent::Promises.future

Argument passing is preserved through the composited executor.

Concurrent::Future has been moved to a prepend style to resolve an
infinite call loop situation.

* Use stub_const+dup to have a temporary inheritance chain

This allows a rollback of the effects of prepend upon the original
module being restored after each test.

* updating propogate traces through promises tests to work with 1.x

* Update context_composite_executor_service post method to use continue_trace method for thread safety

---------

Co-authored-by: Edmund Kump <edmund.kump@datadoghq.com>
  • Loading branch information
lloeki and ekump authored Nov 1, 2023
1 parent f47324f commit f3e4f7e
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Datadog
module Tracing
module Contrib
module ConcurrentRuby
# wraps existing executor to carry over trace context
# Wraps existing executor to carry over trace context
class ContextCompositeExecutorService
include Concurrent::ExecutorService

Expand All @@ -16,20 +16,20 @@ def initialize(composited_executor)
@composited_executor = composited_executor
end

# post method runs the task within composited executor - in a different thread
# post method runs the task within composited executor - in a different thread. The original arguments are
# captured to be propagated to the composited executor post method
def post(*args, &task)
tracer = Tracing.send(:tracer)
parent_context = tracer.provider.context

@composited_executor.post(*args) do
begin
original_context = tracer.provider.context
tracer.provider.context = parent_context
yield
ensure
# Restore context in case the current thread gets reused
tracer.provider.context = original_context
end
digest = Tracing.active_trace.to_digest
executor = @composited_executor.is_a?(Symbol) ? Concurrent.executor(@composited_executor) : @composited_executor

# Pass the original arguments to the composited executor, which
# pushes them (possibly transformed) as block args
executor.post(*args) do |*block_args|
Tracing.continue_trace!(digest)

# Pass the executor-provided block args as they should have been
# originally passed without composition, see ChainPromise#on_resolvable
yield(*block_args)
end
end

Expand Down
13 changes: 3 additions & 10 deletions lib/datadog/tracing/contrib/concurrent_ruby/future_patch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,10 @@ module Contrib
module ConcurrentRuby
# This patches the Future - to wrap executor service using ContextCompositeExecutorService
module FuturePatch
def self.included(base)
base.class_eval do
alias_method :ns_initialize_without_datadog, :ns_initialize
remove_method(:ns_initialize)
def ns_initialize(value, opts)
super(value, opts)

def ns_initialize(value, opts)
ns_initialize_without_datadog(value, opts)

@executor = ContextCompositeExecutorService.new(@executor)
end
end
@executor = ContextCompositeExecutorService.new(@executor)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/datadog/tracing/contrib/concurrent_ruby/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def self.version
end

def self.loaded?
!defined?(::Concurrent::Future).nil?
# Concurrent::Future is deprecated in favour of Concurrent::Promises::Future
!defined?(::Concurrent::Promises::Future).nil? || !defined?(::Concurrent::Future).nil?
end

def self.compatible?
Expand Down
9 changes: 8 additions & 1 deletion lib/datadog/tracing/contrib/concurrent_ruby/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ def target_version
def patch
require_relative 'future_patch'
patch_future
require_relative 'promises_future_patch'
patch_promises_future
end

# Propagate tracing context in Concurrent::Future
def patch_future
::Concurrent::Future.include(FuturePatch)
::Concurrent::Future.prepend(FuturePatch) if defined?(::Concurrent::Future)
end

# Propagate tracing context in Concurrent::Promises::Future
def patch_promises_future
::Concurrent::Promises.singleton_class.prepend(PromisesFuturePatch) if defined?(::Concurrent::Promises::Future)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require_relative 'context_composite_executor_service'

module Datadog
module Tracing
module Contrib
module ConcurrentRuby
# This patches the Future - to wrap executor service using ContextCompositeExecutorService
module PromisesFuturePatch
def future_on(default_executor, *args, &task)
unless default_executor.is_a?(ContextCompositeExecutorService)
default_executor = ContextCompositeExecutorService.new(default_executor)
end

super(default_executor, *args, &task)
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Datadog
module Tracing
module Contrib
module ConcurrentRuby
def future_on: (untyped default_executor, untyped args, untyped task) -> untyped
end
end
end
end
34 changes: 29 additions & 5 deletions spec/datadog/tracing/contrib/concurrent_ruby/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,40 @@
describe '.loaded?' do
subject(:loaded?) { described_class.loaded? }

context 'when Concurrent::Future is defined' do
before { stub_const('Concurrent::Future', Class.new) }
context 'when no Future is defined' do
before do
hide_const('Concurrent::Future')
hide_const('Concurrent::Promises::Future')
end

it { is_expected.to be false }
end

context 'when current Future is defined and deprecated Future is not defined' do
before do
hide_const('Concurrent::Future')
stub_const('Concurrent::Promises::Future', Class.new)
end

it { is_expected.to be true }
end

context 'when both Future are defined' do
before do
stub_const('Concurrent::Future', Class.new)
stub_const('Concurrent::Promises::Future', Class.new)
end

it { is_expected.to be true }
end

context 'when Concurrent::Future is not defined' do
before { hide_const('Concurrent::Future') }
context 'when current Future is not defined and deprecated Future is defined' do
before do
stub_const('Concurrent::Future', Class.new)
hide_const('Concurrent::Promises::Future')
end

it { is_expected.to be false }
it { is_expected.to be true }
end
end

Expand Down
Loading

0 comments on commit f3e4f7e

Please sign in to comment.