Skip to content

Commit

Permalink
Propagate trace through Concurrent::Promises.future
Browse files Browse the repository at this point in the history
Argument passing is preserved through the composited executor.

Concurrent::Future has been moved to a prepend style to resolve an
infinite call loop situation.
  • Loading branch information
lloeki committed May 27, 2021
1 parent 22ff990 commit 9281c98
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Datadog
module Contrib
module ConcurrentRuby
# wraps existing executor to carry over trace context
# Wraps existing executor to carry over trace context
class ContextCompositeExecutorService
extend Forwardable
include Concurrent::ExecutorService
Expand All @@ -14,15 +14,24 @@ def initialize(composited_executor)
@composited_executor = composited_executor
end

# post method runs the task within composited executor - in a different thread
def post(*args, &task)
# The post method runs the task within composited executor - in a
# different thread. The original arguments are captured to be
# propagated to the composited executor post method
def post(*args, &block)
parent_context = datadog_configuration.tracer.provider.context

@composited_executor.post(*args) do
executor = @composited_executor.is_a?(Symbol) ? Concurrent.executor(@composited_executor) : @composited_executor

# Pass the original arguments to the composited executor, which
# pushes them (possibly transformed) as block args
executor.post(*args) do |*block_args|
begin
original_context = datadog_configuration.tracer.provider.context
datadog_configuration.tracer.provider.context = parent_context
yield

# Pass the executor-provided block args as they should have been
# originally passed without composition, see ChainPromise#on_resolvable
yield(*block_args)
ensure
# Restore context in case the current thread gets reused
datadog_configuration.tracer.provider.context = original_context
Expand Down
13 changes: 3 additions & 10 deletions lib/ddtrace/contrib/concurrent_ruby/future_patch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@ module Contrib
module ConcurrentRuby
# This patches the Future - to wrap executor service using ContextCompositeExecutorService
module FuturePatch
def self.included(base)
base.class_eval do
alias_method :ns_initialize_without_datadog, :ns_initialize
remove_method(:ns_initialize)
def ns_initialize(value, opts)
super(value, opts)

def ns_initialize(value, opts)
ns_initialize_without_datadog(value, opts)

@executor = ContextCompositeExecutorService.new(@executor)
end
end
@executor = ContextCompositeExecutorService.new(@executor)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/ddtrace/contrib/concurrent_ruby/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def self.version
end

def self.loaded?
!defined?(::Concurrent::Future).nil?
# Concurrent::Future is deprecated in favour of Concurrent::Promises::Future
!defined?(::Concurrent::Promises::Future).nil? || !defined?(::Concurrent::Future).nil?
end

def self.compatible?
Expand Down
11 changes: 10 additions & 1 deletion lib/ddtrace/contrib/concurrent_ruby/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@ def target_version
def patch
require 'ddtrace/contrib/concurrent_ruby/future_patch'
patch_future
require 'ddtrace/contrib/concurrent_ruby/promises_future_patch'
patch_promises_future
end

# Propagate tracing context in Concurrent::Future
def patch_future
::Concurrent::Future.include(FuturePatch)
::Concurrent::Future.prepend(FuturePatch) if defined?(::Concurrent::Future)
end

# Propagate tracing context in Concurrent::Promises::Future
def patch_promises_future
if defined?(::Concurrent::Promises::Future)
::Concurrent::Promises.singleton_class.prepend(PromisesFuturePatch)
end
end
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/ddtrace/contrib/concurrent_ruby/promises_future_patch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
require 'ddtrace/contrib/concurrent_ruby/context_composite_executor_service'

module Datadog
module Contrib
module ConcurrentRuby
# This patches the Future - to wrap executor service using ContextCompositeExecutorService
module PromisesFuturePatch
def future_on(default_executor, *args, &task)
unless default_executor.is_a?(ContextCompositeExecutorService)
default_executor = ContextCompositeExecutorService.new(default_executor)
end

super(default_executor, *args, &task)
end
end
end
end
end
24 changes: 20 additions & 4 deletions spec/ddtrace/contrib/concurrent_ruby/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,32 @@
describe '.loaded?' do
subject(:loaded?) { described_class.loaded? }

context 'when Concurrent::Future is defined' do
context 'when no Future is defined' do
before { hide_const('Concurrent::Future') }
before { hide_const('Concurrent::Promises::Future') }

it { is_expected.to be false }
end

context 'when current Future is defined and deprecated Future is not defined' do
before { hide_const('Concurrent::Future') }
before { stub_const('Concurrent::Promises::Future', Class.new) }

it { is_expected.to be true }
end

context 'when both Future are defined' do
before { stub_const('Concurrent::Future', Class.new) }
before { stub_const('Concurrent::Promises::Future', Class.new) }

it { is_expected.to be true }
end

context 'when Concurrent::Future is not defined' do
before { hide_const('Concurrent::Future') }
context 'when current Future is not defined and deprecated Future is defined' do
before { stub_const('Concurrent::Future', Class.new) }
before { hide_const('Concurrent::Promises::Future') }

it { is_expected.to be false }
it { is_expected.to be true }
end
end

Expand Down
156 changes: 111 additions & 45 deletions spec/ddtrace/contrib/concurrent_ruby/integration_test_spec.rb
Original file line number Diff line number Diff line change
@@ -1,45 +1,27 @@
require 'concurrent/future'
require 'concurrent-ruby' # concurrent-ruby is not modular

require 'ddtrace/contrib/support/spec_helper'
require 'ddtrace'
require 'spec/support/thread_helpers'

RSpec.describe 'ConcurrentRuby integration tests' do
# DEV We save an unmodified copy of Concurrent::Future.
let!(:unmodified_future) { ::Concurrent::Future.dup }
let(:configuration_options) { {} }
let(:outer_span) { spans.find { |s| s.name == 'outer_span' } }
let(:inner_span) { spans.find { |s| s.name == 'inner_span' } }

before(:context) do
# Execute an async future to force the eager creation of internal
# global threads that are never closed.
#
# This allows us to separate internal concurrent-ruby threads
# from ddtrace threads for leak detection.
ThreadHelpers.with_leaky_thread_creation(:concurrent_ruby) do
Concurrent::Future.execute {}.value
end
end
# DEV We save an unmodified copy of Concurrent::Future.
let!(:unmodified_promises) { ::Concurrent::Promises.dup }
let!(:unmodified_future) { ::Concurrent::Future.dup }

# DEV We then restore Concurrent::Future, a dangerous game.
after do
::Concurrent.send(:remove_const, :Promises)
::Concurrent.const_set('Promises', unmodified_promises)
::Concurrent.send(:remove_const, :Future)
::Concurrent.const_set('Future', unmodified_future)
remove_patch!(:concurrent_ruby)
end

subject(:deferred_execution) do
outer_span = tracer.trace('outer_span')
future = Concurrent::Future.new do
tracer.trace('inner_span') {}
end
future.execute

future.wait
outer_span.finish
end

shared_examples_for 'deferred execution' do
before do
deferred_execution
Expand All @@ -58,40 +40,124 @@
end
end

describe 'patching' do
subject(:patch) do
Datadog.configure do |c|
c.use :concurrent_ruby
context 'Concurrent::Promises::Future' do
before(:context) do
# Execute an async future to force the eager creation of internal
# global threads that are never closed.
#
# This allows us to separate internal concurrent-ruby threads
# from ddtrace threads for leak detection.
ThreadHelpers.with_leaky_thread_creation(:concurrent_ruby) do
Concurrent::Promises.future {}.value
end
end

subject(:deferred_execution) do
outer_span = tracer.trace('outer_span')
future = Concurrent::Promises.future do
tracer.trace('inner_span') {}
end

future.wait
outer_span.finish
end

it 'adds FuturePatch to Future ancestors' do
expect { patch }.to change { ::Concurrent::Future.ancestors.map(&:to_s) }
.to include('Datadog::Contrib::ConcurrentRuby::FuturePatch')
describe 'patching' do
subject(:patch) do
Datadog.configure do |c|
c.use :concurrent_ruby
end
end

it 'adds PromisesFuturePatch to Promises ancestors' do
expect { patch }.to change { ::Concurrent::Promises.singleton_class.ancestors.map(&:to_s) }
.to include('Datadog::Contrib::ConcurrentRuby::PromisesFuturePatch')
end
end
end

context 'when context propagation is disabled' do
it_behaves_like 'deferred execution'
context 'when context propagation is disabled' do
it_behaves_like 'deferred execution'

it 'inner span should not have parent' do
deferred_execution
expect(inner_span.parent).to be_nil
it 'inner span should not have parent' do
deferred_execution
expect(inner_span.parent).to be_nil
end
end

context 'when context propagation is enabled' do
before do
Datadog.configure do |c|
c.use :concurrent_ruby
end
end

it_behaves_like 'deferred execution'

it 'inner span parent should be included in outer span' do
deferred_execution
expect(inner_span.parent).to eq(outer_span)
end
end
end

context 'when context propagation is enabled' do
before do
Datadog.configure do |c|
c.use :concurrent_ruby
context 'Concurrent::Future (deprecated)' do
before(:context) do
# Execute an async future to force the eager creation of internal
# global threads that are never closed.
#
# This allows us to separate internal concurrent-ruby threads
# from ddtrace threads for leak detection.
ThreadHelpers.with_leaky_thread_creation(:concurrent_ruby) do
Concurrent::Future.execute {}.value
end
end

it_behaves_like 'deferred execution'
subject(:deferred_execution) do
outer_span = tracer.trace('outer_span')
future = Concurrent::Future.new do
tracer.trace('inner_span') {}
end
future.execute

it 'inner span parent should be included in outer span' do
deferred_execution
expect(inner_span.parent).to eq(outer_span)
future.wait
outer_span.finish
end

describe 'patching' do
subject(:patch) do
Datadog.configure do |c|
c.use :concurrent_ruby
end
end

it 'adds FuturePatch to Future ancestors' do
expect { patch }.to change { ::Concurrent::Future.ancestors.map(&:to_s) }
.to include('Datadog::Contrib::ConcurrentRuby::FuturePatch')
end
end

context 'when context propagation is disabled' do
it_behaves_like 'deferred execution'

it 'inner span should not have parent' do
deferred_execution
expect(inner_span.parent).to be_nil
end
end

context 'when context propagation is enabled' do
before do
Datadog.configure do |c|
c.use :concurrent_ruby
end
end

it_behaves_like 'deferred execution'

it 'inner span parent should be included in outer span' do
deferred_execution
expect(inner_span.parent).to eq(outer_span)
end
end
end
end

0 comments on commit 9281c98

Please sign in to comment.