diff --git a/lib/datadog/tracing/contrib/concurrent_ruby/context_composite_executor_service.rb b/lib/datadog/tracing/contrib/concurrent_ruby/context_composite_executor_service.rb index a0b1ba04c3b..f20f04b6d16 100644 --- a/lib/datadog/tracing/contrib/concurrent_ruby/context_composite_executor_service.rb +++ b/lib/datadog/tracing/contrib/concurrent_ruby/context_composite_executor_service.rb @@ -6,7 +6,7 @@ module Datadog module Tracing module Contrib module ConcurrentRuby - # wraps existing executor to carry over trace context + # Wraps existing executor to carry over trace context class ContextCompositeExecutorService include Concurrent::ExecutorService @@ -16,20 +16,20 @@ def initialize(composited_executor) @composited_executor = composited_executor end - # post method runs the task within composited executor - in a different thread + # post method runs the task within composited executor - in a different thread. The original arguments are + # captured to be propagated to the composited executor post method def post(*args, &task) - tracer = Tracing.send(:tracer) - parent_context = tracer.provider.context - - @composited_executor.post(*args) do - begin - original_context = tracer.provider.context - tracer.provider.context = parent_context - yield - ensure - # Restore context in case the current thread gets reused - tracer.provider.context = original_context - end + digest = Tracing.active_trace.to_digest + executor = @composited_executor.is_a?(Symbol) ? Concurrent.executor(@composited_executor) : @composited_executor + + # Pass the original arguments to the composited executor, which + # pushes them (possibly transformed) as block args + executor.post(*args) do |*block_args| + Tracing.continue_trace!(digest) + + # Pass the executor-provided block args as they should have been + # originally passed without composition, see ChainPromise#on_resolvable + yield(*block_args) end end diff --git a/lib/datadog/tracing/contrib/concurrent_ruby/future_patch.rb b/lib/datadog/tracing/contrib/concurrent_ruby/future_patch.rb index 39f00de89df..2079a52eab6 100644 --- a/lib/datadog/tracing/contrib/concurrent_ruby/future_patch.rb +++ b/lib/datadog/tracing/contrib/concurrent_ruby/future_patch.rb @@ -8,17 +8,10 @@ module Contrib module ConcurrentRuby # This patches the Future - to wrap executor service using ContextCompositeExecutorService module FuturePatch - def self.included(base) - base.class_eval do - alias_method :ns_initialize_without_datadog, :ns_initialize - remove_method(:ns_initialize) + def ns_initialize(value, opts) + super(value, opts) - def ns_initialize(value, opts) - ns_initialize_without_datadog(value, opts) - - @executor = ContextCompositeExecutorService.new(@executor) - end - end + @executor = ContextCompositeExecutorService.new(@executor) end end end diff --git a/lib/datadog/tracing/contrib/concurrent_ruby/integration.rb b/lib/datadog/tracing/contrib/concurrent_ruby/integration.rb index f4df4233dab..c98353f5178 100644 --- a/lib/datadog/tracing/contrib/concurrent_ruby/integration.rb +++ b/lib/datadog/tracing/contrib/concurrent_ruby/integration.rb @@ -20,7 +20,8 @@ def self.version end def self.loaded? - !defined?(::Concurrent::Future).nil? + # Concurrent::Future is deprecated in favour of Concurrent::Promises::Future + !defined?(::Concurrent::Promises::Future).nil? || !defined?(::Concurrent::Future).nil? end def self.compatible? diff --git a/lib/datadog/tracing/contrib/concurrent_ruby/patcher.rb b/lib/datadog/tracing/contrib/concurrent_ruby/patcher.rb index 65ec34e80f0..bbc8e4ff018 100644 --- a/lib/datadog/tracing/contrib/concurrent_ruby/patcher.rb +++ b/lib/datadog/tracing/contrib/concurrent_ruby/patcher.rb @@ -19,11 +19,18 @@ def target_version def patch require_relative 'future_patch' patch_future + require_relative 'promises_future_patch' + patch_promises_future end # Propagate tracing context in Concurrent::Future def patch_future - ::Concurrent::Future.include(FuturePatch) + ::Concurrent::Future.prepend(FuturePatch) if defined?(::Concurrent::Future) + end + + # Propagate tracing context in Concurrent::Promises::Future + def patch_promises_future + ::Concurrent::Promises.singleton_class.prepend(PromisesFuturePatch) if defined?(::Concurrent::Promises::Future) end end end diff --git a/lib/datadog/tracing/contrib/concurrent_ruby/promises_future_patch.rb b/lib/datadog/tracing/contrib/concurrent_ruby/promises_future_patch.rb new file mode 100644 index 00000000000..5c0a299f821 --- /dev/null +++ b/lib/datadog/tracing/contrib/concurrent_ruby/promises_future_patch.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require_relative 'context_composite_executor_service' + +module Datadog + module Tracing + module Contrib + module ConcurrentRuby + # This patches the Future - to wrap executor service using ContextCompositeExecutorService + module PromisesFuturePatch + def future_on(default_executor, *args, &task) + unless default_executor.is_a?(ContextCompositeExecutorService) + default_executor = ContextCompositeExecutorService.new(default_executor) + end + + super(default_executor, *args, &task) + end + end + end + end + end +end diff --git a/sig/datadog/tracing/contrib/concurrent_ruby/promises_future_patch.rbs b/sig/datadog/tracing/contrib/concurrent_ruby/promises_future_patch.rbs new file mode 100644 index 00000000000..e62dbfa5515 --- /dev/null +++ b/sig/datadog/tracing/contrib/concurrent_ruby/promises_future_patch.rbs @@ -0,0 +1,9 @@ +module Datadog + module Tracing + module Contrib + module ConcurrentRuby + def future_on: (untyped default_executor, untyped args, untyped task) -> untyped + end + end + end +end diff --git a/spec/datadog/tracing/contrib/concurrent_ruby/integration_spec.rb b/spec/datadog/tracing/contrib/concurrent_ruby/integration_spec.rb index 0ad9415f882..46b81e41cdc 100644 --- a/spec/datadog/tracing/contrib/concurrent_ruby/integration_spec.rb +++ b/spec/datadog/tracing/contrib/concurrent_ruby/integration_spec.rb @@ -22,16 +22,40 @@ describe '.loaded?' do subject(:loaded?) { described_class.loaded? } - context 'when Concurrent::Future is defined' do - before { stub_const('Concurrent::Future', Class.new) } + context 'when no Future is defined' do + before do + hide_const('Concurrent::Future') + hide_const('Concurrent::Promises::Future') + end + + it { is_expected.to be false } + end + + context 'when current Future is defined and deprecated Future is not defined' do + before do + hide_const('Concurrent::Future') + stub_const('Concurrent::Promises::Future', Class.new) + end + + it { is_expected.to be true } + end + + context 'when both Future are defined' do + before do + stub_const('Concurrent::Future', Class.new) + stub_const('Concurrent::Promises::Future', Class.new) + end it { is_expected.to be true } end - context 'when Concurrent::Future is not defined' do - before { hide_const('Concurrent::Future') } + context 'when current Future is not defined and deprecated Future is defined' do + before do + stub_const('Concurrent::Future', Class.new) + hide_const('Concurrent::Promises::Future') + end - it { is_expected.to be false } + it { is_expected.to be true } end end diff --git a/spec/datadog/tracing/contrib/concurrent_ruby/integration_test_spec.rb b/spec/datadog/tracing/contrib/concurrent_ruby/integration_test_spec.rb index 85d7d7ce4a2..9abd9e85483 100644 --- a/spec/datadog/tracing/contrib/concurrent_ruby/integration_test_spec.rb +++ b/spec/datadog/tracing/contrib/concurrent_ruby/integration_test_spec.rb @@ -1,45 +1,24 @@ -require 'concurrent/future' +require 'concurrent-ruby' # concurrent-ruby is not modular require 'datadog/tracing/contrib/support/spec_helper' require 'ddtrace' require 'spec/support/thread_helpers' RSpec.describe 'ConcurrentRuby integration tests' do - # DEV We save an unmodified copy of Concurrent::Future. - let!(:unmodified_future) { ::Concurrent::Future.dup } let(:configuration_options) { {} } let(:outer_span) { spans.find { |s| s.name == 'outer_span' } } let(:inner_span) { spans.find { |s| s.name == 'inner_span' } } - before(:context) do - # Execute an async future to force the eager creation of internal - # global threads that are never closed. - # - # This allows us to separate internal concurrent-ruby threads - # from ddtrace threads for leak detection. - ThreadHelpers.with_leaky_thread_creation(:concurrent_ruby) do - Concurrent::Future.execute {}.value - end + before do + # stub inheritance chain for instrumentation rollback + stub_const('Concurrent::Promises', ::Concurrent::Promises.dup) + stub_const('Concurrent::Future', ::Concurrent::Future.dup) end - # DEV We then restore Concurrent::Future, a dangerous game. after do - ::Concurrent.send(:remove_const, :Future) - ::Concurrent.const_set('Future', unmodified_future) remove_patch!(:concurrent_ruby) end - subject(:deferred_execution) do - outer_span = tracer.trace('outer_span') - future = Concurrent::Future.new do - tracer.trace('inner_span') {} - end - future.execute - - future.wait - outer_span.finish - end - shared_examples_for 'deferred execution' do before do deferred_execution @@ -58,40 +37,165 @@ end end - describe 'patching' do - subject(:patch) do - Datadog.configure do |c| - c.tracing.instrument :concurrent_ruby + context 'Concurrent::Promises::Future' do + before(:context) do + # Execute an async future to force the eager creation of internal + # global threads that are never closed. + # + # This allows us to separate internal concurrent-ruby threads + # from ddtrace threads for leak detection. We need to create the maximum + # number of threads that will be created concurrently in a test, which in + # this case is 2. + ThreadHelpers.with_leaky_thread_creation(:concurrent_ruby) do + Concurrent::Promises.future do + Concurrent::Promises.future {}.value + end.value end end - it 'adds FuturePatch to Future ancestors' do - expect { patch }.to change { ::Concurrent::Future.ancestors.map(&:to_s) } - .to include('Datadog::Tracing::Contrib::ConcurrentRuby::FuturePatch') + subject(:deferred_execution) do + outer_span = tracer.trace('outer_span') + future = Concurrent::Promises.future do + tracer.trace('inner_span') {} + end + + future.wait + outer_span.finish end - end - context 'when context propagation is disabled' do - it_behaves_like 'deferred execution' + describe 'patching' do + subject(:patch) do + Datadog.configure do |c| + c.tracing.instrument :concurrent_ruby + end + end - it 'inner span should not have parent' do - deferred_execution - expect(inner_span).to be_root_span + it 'adds PromisesFuturePatch to Promises ancestors' do + expect { patch }.to change { ::Concurrent::Promises.singleton_class.ancestors.map(&:to_s) } + .to include('Datadog::Tracing::Contrib::ConcurrentRuby::PromisesFuturePatch') + end + end + + context 'when context propagation is disabled' do + it_behaves_like 'deferred execution' + + it 'inner span should not have parent' do + deferred_execution + expect(inner_span).to be_root_span + end + end + + context 'when context propagation is enabled' do + before do + Datadog.configure do |c| + c.tracing.instrument :concurrent_ruby + end + end + + it_behaves_like 'deferred execution' + + it 'inner span parent should be included in outer span' do + deferred_execution + expect(inner_span.parent_id).to eq(outer_span.span_id) + end + + context 'when there are multiple futures with inner spans that have the same parent' do + let(:second_inner_span) { spans.find { |s| s.name == 'second_inner_span' } } + + subject(:multiple_deferred_executions) do + # use a barrier to ensure both threads are created before continuing + barrier = Concurrent::CyclicBarrier.new(2) + + outer_span = tracer.trace('outer_span') + future_1 = Concurrent::Promises.future do + barrier.wait + tracer.trace('inner_span') do + barrier.wait + end + end + + future_2 = Concurrent::Promises.future do + barrier.wait + tracer.trace('second_inner_span') do + barrier.wait + end + end + + future_1.wait + future_2.wait + outer_span.finish + end + + describe 'it correctly associates to the parent span' do + it 'both inner span parents should be included in same outer span' do + multiple_deferred_executions + + expect(inner_span.parent_id).to eq(outer_span.span_id) + expect(second_inner_span.parent_id).to eq(outer_span.span_id) + end + end + end end end - context 'when context propagation is enabled' do - before do - Datadog.configure do |c| - c.tracing.instrument :concurrent_ruby + context 'Concurrent::Future (deprecated)' do + before(:context) do + # Execute an async future to force the eager creation of internal + # global threads that are never closed. + # + # This allows us to separate internal concurrent-ruby threads + # from ddtrace threads for leak detection. + ThreadHelpers.with_leaky_thread_creation(:concurrent_ruby) do + Concurrent::Future.execute {}.value end end - it_behaves_like 'deferred execution' + subject(:deferred_execution) do + outer_span = tracer.trace('outer_span') + future = Concurrent::Future.new do + tracer.trace('inner_span') {} + end + future.execute - it 'inner span parent should be included in outer span' do - deferred_execution - expect(inner_span.parent_id).to eq(outer_span.span_id) + future.wait + outer_span.finish + end + + describe 'patching' do + subject(:patch) do + Datadog.configure do |c| + c.tracing.instrument :concurrent_ruby + end + end + + it 'adds FuturePatch to Future ancestors' do + expect { patch }.to change { ::Concurrent::Future.ancestors.map(&:to_s) } + .to include('Datadog::Tracing::Contrib::ConcurrentRuby::FuturePatch') + end + end + + context 'when context propagation is disabled' do + it_behaves_like 'deferred execution' + + it 'inner span should not have parent' do + deferred_execution + expect(inner_span).to be_root_span + end + end + + context 'when context propagation is enabled' do + before do + Datadog.configure do |c| + c.tracing.instrument :concurrent_ruby + end + end + + it_behaves_like 'deferred execution' + + it 'inner span parent should be included in outer span' do + deferred_execution + expect(inner_span.parent_id).to eq(outer_span.span_id) + end end end end