diff --git a/lib/datadog/core/telemetry/event.rb b/lib/datadog/core/telemetry/event.rb index 60a9886eb1e..da03069aa54 100644 --- a/lib/datadog/core/telemetry/event.rb +++ b/lib/datadog/core/telemetry/event.rb @@ -3,7 +3,16 @@ module Datadog module Core module Telemetry + # Collection of telemetry events class Event + extend Core::Utils::Forking + + # returns sequence that increments every time the configuration changes + def self.configuration_sequence + after_fork! { @sequence = Datadog::Core::Utils::Sequence.new(1) } + @sequence ||= Datadog::Core::Utils::Sequence.new(1) + end + # Base class for all Telemetry V2 events. class Base # The type of the event. @@ -12,8 +21,7 @@ class Base def type; end # The JSON payload for the event. - # @param seq_id [Integer] The sequence ID for the event. - def payload(seq_id) + def payload {} end end @@ -24,8 +32,7 @@ def type 'app-started' end - def payload(seq_id) - @seq_id = seq_id + def payload { products: products, configuration: configuration, @@ -80,16 +87,19 @@ def products ].freeze # rubocop:disable Metrics/AbcSize + # rubocop:disable Metrics/MethodLength def configuration config = Datadog.configuration + seq_id = Event.configuration_sequence.next list = [ - conf_value('DD_AGENT_HOST', config.agent.host), - conf_value('DD_AGENT_TRANSPORT', agent_transport(config)), - conf_value('DD_TRACE_SAMPLE_RATE', to_value(config.tracing.sampling.default_rate)), + conf_value('DD_AGENT_HOST', config.agent.host, seq_id), + conf_value('DD_AGENT_TRANSPORT', agent_transport(config), seq_id), + conf_value('DD_TRACE_SAMPLE_RATE', to_value(config.tracing.sampling.default_rate), seq_id), conf_value( 'DD_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED', - config.tracing.contrib.global_default_service_name.enabled + config.tracing.contrib.global_default_service_name.enabled, + seq_id ), ] @@ -98,32 +108,45 @@ def configuration peer_service_mapping = config.tracing.contrib.peer_service_mapping peer_service_mapping_str = peer_service_mapping.map { |key, value| "#{key}:#{value}" }.join(',') end - list << conf_value('DD_TRACE_PEER_SERVICE_MAPPING', peer_service_mapping_str) + list << conf_value('DD_TRACE_PEER_SERVICE_MAPPING', peer_service_mapping_str, seq_id) # Whitelist of configuration options to send in additional payload object TARGET_OPTIONS.each do |option| split_option = option.split('.') - list << conf_value(option, to_value(config.dig(*split_option))) + list << conf_value(option, to_value(config.dig(*split_option)), seq_id) end # Add some more custom additional payload values here list.push( - conf_value('tracing.auto_instrument.enabled', !defined?(Datadog::AutoInstrument::LOADED).nil?), - conf_value('tracing.writer_options.buffer_size', to_value(config.tracing.writer_options[:buffer_size])), - conf_value('tracing.writer_options.flush_interval', to_value(config.tracing.writer_options[:flush_interval])), - conf_value('tracing.opentelemetry.enabled', !defined?(Datadog::OpenTelemetry::LOADED).nil?), + conf_value('tracing.auto_instrument.enabled', !defined?(Datadog::AutoInstrument::LOADED).nil?, seq_id), + conf_value( + 'tracing.writer_options.buffer_size', + to_value(config.tracing.writer_options[:buffer_size]), + seq_id + ), + conf_value( + 'tracing.writer_options.flush_interval', + to_value(config.tracing.writer_options[:flush_interval]), + seq_id + ), + conf_value( + 'tracing.opentelemetry.enabled', + !defined?(Datadog::OpenTelemetry::LOADED).nil?, + seq_id + ), ) - list << conf_value('logger.instance', config.logger.instance.class.to_s) if config.logger.instance + list << conf_value('logger.instance', config.logger.instance.class.to_s, seq_id) if config.logger.instance if config.respond_to?('appsec') - list << conf_value('appsec.enabled', config.dig('appsec', 'enabled')) - list << conf_value('appsec.sca_enabled', config.dig('appsec', 'sca_enabled')) + list << conf_value('appsec.enabled', config.dig('appsec', 'enabled'), seq_id) + list << conf_value('appsec.sca_enabled', config.dig('appsec', 'sca_enabled'), seq_id) end - list << conf_value('ci.enabled', config.dig('ci', 'enabled')) if config.respond_to?('ci') + list << conf_value('ci.enabled', config.dig('ci', 'enabled'), seq_id) if config.respond_to?('ci') list.reject! { |entry| entry[:value].nil? } list end # rubocop:enable Metrics/AbcSize + # rubocop:enable Metrics/MethodLength def agent_transport(config) adapter = Core::Configuration::AgentSettingsResolver.call(config).adapter @@ -134,12 +157,12 @@ def agent_transport(config) end end - def conf_value(name, value, origin = 'code') + def conf_value(name, value, seq_id, origin = 'code') { name: name, value: value, origin: origin, - seq_id: @seq_id, + seq_id: seq_id, } end @@ -169,7 +192,7 @@ def type 'app-dependencies-loaded' end - def payload(seq_id) + def payload { dependencies: dependencies } end @@ -192,7 +215,7 @@ def type 'app-integrations-change' end - def payload(seq_id) + def payload { integrations: integrations } end @@ -245,18 +268,20 @@ def initialize(changes, origin) @origin = origin end - def payload(seq_id) - { configuration: configuration(seq_id) } + def payload + { configuration: configuration } end - def configuration(seq_id) + def configuration config = Datadog.configuration + seq_id = Event.configuration_sequence.next res = @changes.map do |name, value| { name: name, value: value, origin: @origin, + seq_id: seq_id, } end @@ -299,7 +324,7 @@ def initialize(namespace, metric_series) @metric_series = metric_series end - def payload(_) + def payload { namespace: @namespace, series: @metric_series.map(&:to_h) @@ -313,6 +338,28 @@ def type 'distributions' end end + + # Telemetry class for the 'message-batch' event + class MessageBatch + attr_reader :events + + def type + 'message-batch' + end + + def initialize(events) + @events = events + end + + def payload + @events.map do |event| + { + request_type: event.type, + payload: event.payload, + } + end + end + end end end end diff --git a/lib/datadog/core/telemetry/request.rb b/lib/datadog/core/telemetry/request.rb index 86e6bb3b6d9..f15831bc1cf 100644 --- a/lib/datadog/core/telemetry/request.rb +++ b/lib/datadog/core/telemetry/request.rb @@ -17,7 +17,7 @@ def build_payload(event, seq_id) application: application, debug: false, host: host, - payload: event.payload(seq_id), + payload: event.payload, request_type: event.type, runtime_id: Core::Environment::Identity.id, seq_id: seq_id, diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 8cb5a685006..7706ac7ce30 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -82,13 +82,11 @@ def perform(*events) end def flush_events(events) - return if events.nil? + return if events.nil? || events.empty? return if !enabled? || !sent_started_event? Datadog.logger.debug { "Sending #{events&.count} telemetry events" } - events.each do |event| - send_event(event) - end + send_event(Event::MessageBatch.new(events)) end def heartbeat! diff --git a/sig/datadog/core/telemetry/emitter.rbs b/sig/datadog/core/telemetry/emitter.rbs index e1d4320d763..40b1432bedc 100644 --- a/sig/datadog/core/telemetry/emitter.rbs +++ b/sig/datadog/core/telemetry/emitter.rbs @@ -2,7 +2,7 @@ module Datadog module Core module Telemetry class Emitter - @sequence: Datadog::Core::Utils::Sequence + self.@sequence: Datadog::Core::Utils::Sequence attr_reader http_transport: untyped diff --git a/sig/datadog/core/telemetry/event.rbs b/sig/datadog/core/telemetry/event.rbs index 791f014f9a4..53cc1e20819 100644 --- a/sig/datadog/core/telemetry/event.rbs +++ b/sig/datadog/core/telemetry/event.rbs @@ -2,16 +2,20 @@ module Datadog module Core module Telemetry class Event + extend Core::Utils::Forking + + self.@sequence: Datadog::Core::Utils::Sequence + + def self.configuration_sequence: () -> Datadog::Core::Utils::Sequence + class Base - def payload: (int seq_id) -> Hash[Symbol, untyped] + def payload: () -> (Hash[Symbol, untyped] | Array[Hash[Symbol, untyped]]) def type: -> String? end class AppStarted < Base TARGET_OPTIONS: Array[String] - @seq_id: int - private def products: -> Hash[Symbol, untyped] @@ -20,7 +24,7 @@ module Datadog def agent_transport: (untyped config) -> String - def conf_value: (String name, Object value, ?String origin) -> Hash[Symbol, untyped] + def conf_value: (String name, Object value, Integer seq_id, ?String origin) -> Hash[Symbol, untyped] def to_value: (Object value) -> Object @@ -47,7 +51,7 @@ module Datadog def initialize: (Enumerable[[String, Numeric | bool | String]] changes, String origin) -> void - def configuration: (int seq_id) -> Array[Hash[Symbol, untyped]] + def configuration: () -> Array[Hash[Symbol, untyped]] end class AppHeartbeat < Base @@ -65,6 +69,13 @@ module Datadog class Distributions < GenerateMetrics end + + class MessageBatch < Base + attr_reader events: Array[Datadog::Core::Telemetry::Event::Base] + @events: Array[Datadog::Core::Telemetry::Event::Base] + + def initialize: (Array[Datadog::Core::Telemetry::Event::Base] events) -> void + end end end end diff --git a/spec/datadog/core/telemetry/event_spec.rb b/spec/datadog/core/telemetry/event_spec.rb index f53bd9468ad..b8eee1f48d8 100644 --- a/spec/datadog/core/telemetry/event_spec.rb +++ b/spec/datadog/core/telemetry/event_spec.rb @@ -5,7 +5,7 @@ RSpec.describe Datadog::Core::Telemetry::Event do let(:id) { double('seq_id') } - subject(:payload) { event.payload(id) } + subject(:payload) { event.payload } context 'AppStarted' do let(:event) { described_class::AppStarted.new } @@ -14,6 +14,8 @@ end before do + allow_any_instance_of(Datadog::Core::Utils::Sequence).to receive(:next).and_return(id) + Datadog.configure do |c| c.agent.host = '1.2.3.4' c.tracing.sampling.default_rate = 0.5 @@ -164,12 +166,17 @@ def contain_configuration(*array) let(:name) { 'key' } let(:value) { 'value' } + before do + allow_any_instance_of(Datadog::Core::Utils::Sequence).to receive(:next).and_return(id) + end + it 'has a list of client configurations' do is_expected.to eq( configuration: [{ name: name, value: value, origin: origin, + seq_id: id }] ) end @@ -185,7 +192,7 @@ def contain_configuration(*array) is_expected.to eq( configuration: [ - { name: name, value: value, origin: origin }, + { name: name, value: value, origin: origin, seq_id: id }, { name: 'appsec.sca_enabled', value: false, origin: 'code', seq_id: id } ] ) @@ -252,4 +259,25 @@ def contain_configuration(*array) ) end end + + context 'MessageBatch' do + let(:event) { described_class::MessageBatch.new(events) } + + let(:events) { [described_class::AppClosing.new, described_class::AppHeartbeat.new] } + + it do + is_expected.to eq( + [ + { + request_type: 'app-closing', + payload: {} + }, + { + request_type: 'app-heartbeat', + payload: {} + } + ] + ) + end + end end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index bc88ad63b38..54b23ca6e79 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -74,15 +74,12 @@ it 'disables the worker' do worker.start - try_wait_until { @received_started } + try_wait_until { !worker.enabled? } - expect(worker).to have_attributes( - enabled?: false, - loop_base_interval: heartbeat_interval_seconds, - ) expect(Datadog.logger).to have_received(:debug).with( 'Agent does not support telemetry; disabling future telemetry events.' ) + expect(@received_started).to be(true) expect(@received_heartbeat).to be(false) end end @@ -222,6 +219,7 @@ context 'several workers running' do it 'sends single started event' do started_events = 0 + mutex = Mutex.new allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppStarted)) do started_events += 1 @@ -230,7 +228,9 @@ heartbeat_events = 0 allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do - heartbeat_events += 1 + mutex.synchronize do + heartbeat_events += 1 + end response end @@ -269,25 +269,46 @@ end describe '#stop' do - let(:heartbeat_interval_seconds) { 3 } + let(:heartbeat_interval_seconds) { 60 } it 'flushes events and stops the worker' do worker.start - expect(worker).to receive(:flush_events).at_least(:once) + try_wait_until { @received_heartbeat } + + events_received = 0 + mutex = Mutex.new + allow(emitter).to receive(:request).with( + an_instance_of(Datadog::Core::Telemetry::Event::MessageBatch) + ) do |event| + event.events.each do |subevent| + mutex.synchronize do + events_received += 1 if subevent.is_a?(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + end + end + + response + end worker.enqueue(Datadog::Core::Telemetry::Event::AppIntegrationsChange.new) worker.stop(true) + + try_wait_until { events_received == 1 } end end describe '#enqueue' do it 'adds events to the buffer and flushes them later' do events_received = 0 + mutex = Mutex.new allow(emitter).to receive(:request).with( - an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange) - ) do - events_received += 1 + an_instance_of(Datadog::Core::Telemetry::Event::MessageBatch) + ) do |event| + event.events.each do |subevent| + mutex.synchronize do + events_received += 1 if subevent.is_a?(Datadog::Core::Telemetry::Event::AppIntegrationsChange) + end + end response end