Skip to content

Commit 971040f

Browse files
authored
Serialize span events via a dedicated field (#4279)
* Increase type checking coverage * Serialize span events via a dedicated field * Revert unrelated test changes * Skip test running against incorrect combinations * Use kwarg
1 parent ba51201 commit 971040f

File tree

21 files changed

+394
-51
lines changed

21 files changed

+394
-51
lines changed

.github/forced-tests-list.json

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
11
{
2-
2+
"AGENT_NOT_SUPPORTING_SPAN_EVENTS":
3+
[
4+
"tests/test_span_events.py"
5+
],
6+
"PARAMETRIC":
7+
[
8+
"tests/parametric/test_span_events.py"
9+
]
310
}

Steepfile

-2
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,8 @@ target :datadog do
137137
ignore 'lib/datadog/tracing/transport/http/traces.rb'
138138
ignore 'lib/datadog/tracing/transport/io/client.rb'
139139
ignore 'lib/datadog/tracing/transport/io/traces.rb'
140-
ignore 'lib/datadog/tracing/transport/serializable_trace.rb'
141140
ignore 'lib/datadog/tracing/transport/statistics.rb'
142141
ignore 'lib/datadog/tracing/transport/trace_formatter.rb'
143-
ignore 'lib/datadog/tracing/transport/traces.rb'
144142
ignore 'lib/datadog/tracing/workers.rb'
145143
ignore 'lib/datadog/tracing/workers/trace_writer.rb'
146144
ignore 'lib/datadog/tracing/writer.rb'

lib/datadog/core/configuration/components.rb

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
require_relative '../../di/component'
1717
require_relative '../crashtracking/component'
1818

19+
require_relative '../environment/agent_info'
20+
1921
module Datadog
2022
module Core
2123
module Configuration
@@ -85,7 +87,8 @@ def build_crashtracker(settings, agent_settings, logger:)
8587
:tracer,
8688
:crashtracker,
8789
:dynamic_instrumentation,
88-
:appsec
90+
:appsec,
91+
:agent_info
8992

9093
def initialize(settings)
9194
@logger = self.class.build_logger(settings)
@@ -96,6 +99,9 @@ def initialize(settings)
9699
# the Core resolver from within your product/component's namespace.
97100
agent_settings = AgentSettingsResolver.call(settings, logger: @logger)
98101

102+
# Exposes agent capability information for detection by any components
103+
@agent_info = Core::Environment::AgentInfo.new(agent_settings)
104+
99105
@telemetry = self.class.build_telemetry(settings, agent_settings, @logger)
100106

101107
@remote = Remote::Component.build(settings, agent_settings, telemetry: telemetry)

lib/datadog/core/encoding.rb

+16
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Encoding
1010
# Encoder interface that provides the logic to encode traces and service
1111
# @abstract
1212
module Encoder
13+
# :nocov:
1314
def content_type
1415
raise NotImplementedError
1516
end
@@ -23,6 +24,13 @@ def join(encoded_elements)
2324
def encode(_)
2425
raise NotImplementedError
2526
end
27+
28+
# Deserializes a value serialized with {#encode}.
29+
# This method is used for debugging purposes.
30+
def decode(_)
31+
raise NotImplementedError
32+
end
33+
# :nocov:
2634
end
2735

2836
# Encoder for the JSON format
@@ -41,6 +49,10 @@ def encode(obj)
4149
JSON.dump(obj)
4250
end
4351

52+
def decode(obj)
53+
JSON.parse(obj)
54+
end
55+
4456
def join(encoded_data)
4557
"[#{encoded_data.join(',')}]"
4658
end
@@ -62,6 +74,10 @@ def encode(obj)
6274
MessagePack.pack(obj)
6375
end
6476

77+
def decode(obj)
78+
MessagePack.unpack(obj)
79+
end
80+
6581
def join(encoded_data)
6682
packer = MessagePack::Packer.new
6783
packer.write_array_header(encoded_data.size)
+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# frozen_string_literal: true
2+
3+
module Datadog
4+
module Core
5+
module Environment
6+
# Retrieves the agent's `/info` endpoint data.
7+
# This data can be used to determine the capabilities of the local Datadog agent.
8+
#
9+
# @example Example response payload
10+
# {
11+
# "version" : "7.57.2",
12+
# "git_commit" : "38ba0c7",
13+
# "endpoints" : [ "/v0.4/traces", "/v0.4/services", "/v0.7/traces", "/v0.7/config" ],
14+
# "client_drop_p0s" : true,
15+
# "span_meta_structs" : true,
16+
# "long_running_spans" : true,
17+
# "evp_proxy_allowed_headers" : [ "Content-Type", "Accept-Encoding", "Content-Encoding", "User-Agent" ],
18+
# "config" : {
19+
# "default_env" : "none",
20+
# "target_tps" : 10,
21+
# "max_eps" : 200,
22+
# "receiver_port" : 8126,
23+
# "receiver_socket" : "/var/run/datadog/apm.socket",
24+
# "connection_limit" : 0,
25+
# "receiver_timeout" : 0,
26+
# "max_request_bytes" : 26214400,
27+
# "statsd_port" : 8125,
28+
# "analyzed_spans_by_service" : { },
29+
# "obfuscation" : {
30+
# "elastic_search" : true,
31+
# "mongo" : true,
32+
# "sql_exec_plan" : false,
33+
# "sql_exec_plan_normalize" : false,
34+
# "http" : {
35+
# "remove_query_string" : false,
36+
# "remove_path_digits" : false
37+
# },
38+
# "remove_stack_traces" : false,
39+
# "redis" : {
40+
# "Enabled" : true,
41+
# "RemoveAllArgs" : false
42+
# },
43+
# "memcached" : {
44+
# "Enabled" : true,
45+
# "KeepCommand" : false
46+
# }
47+
# }
48+
# },
49+
# "peer_tags" : null
50+
# }
51+
#
52+
# @see https://github.com/DataDog/datadog-agent/blob/f07df0a3c1fca0c83b5a15f553bd994091b0c8ac/pkg/trace/api/info.go#L20
53+
class AgentInfo
54+
attr_reader :agent_settings
55+
56+
def initialize(agent_settings)
57+
@agent_settings = agent_settings
58+
@client = Remote::Transport::HTTP.root(agent_settings: agent_settings)
59+
end
60+
61+
# Fetches the information from the agent.
62+
# @return [Datadog::Core::Remote::Transport::HTTP::Negotiation::Response] the response from the agent
63+
# @return [nil] if an error occurred while fetching the information
64+
def fetch
65+
res = @client.send_info
66+
return unless res.ok?
67+
68+
res
69+
end
70+
71+
def ==(other)
72+
other.is_a?(self.class) && other.agent_settings == agent_settings
73+
end
74+
end
75+
end
76+
end
77+
end

lib/datadog/core/remote/transport/http/negotiation.rb

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def initialize(http_response, options = {})
4343
@version = options[:version]
4444
@endpoints = options[:endpoints]
4545
@config = options[:config]
46+
@span_events = options[:span_events]
4647
end
4748
end
4849

lib/datadog/core/remote/transport/negotiation.rb

+13-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,19 @@ class Request < Datadog::Core::Transport::Request
3232

3333
# Negotiation response
3434
module Response
35-
attr_reader :version, :endpoints, :config
35+
# @!attribute [r] version
36+
# The version of the agent.
37+
# @return [String]
38+
# @!attribute [r] endpoints
39+
# The HTTP endpoints the agent supports.
40+
# @return [Array<String>]
41+
# @!attribute [r] config
42+
# The agent configuration. These are configured by the user when starting the agent, as well as any defaults.
43+
# @return [Hash]
44+
# @!attribute [r] span_events
45+
# Whether the agent supports the top-level span events field in flushed spans.
46+
# @return [Boolean,nil]
47+
attr_reader :version, :endpoints, :config, :span_events
3648
end
3749

3850
# Negotiation transport

lib/datadog/tracing/transport/serializable_trace.rb

+8-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class SerializableTrace
1414

1515
# @param trace [Datadog::Trace] the trace to serialize
1616
# @param native_events_supported [Boolean] whether the agent supports span events as a top-level field
17-
def initialize(trace, native_events_supported = false)
17+
def initialize(trace, native_events_supported:)
1818
@trace = trace
1919
@native_events_supported = native_events_supported
2020
end
@@ -29,13 +29,17 @@ def initialize(trace, native_events_supported = false)
2929
# @param packer [MessagePack::Packer] serialization buffer, can be +nil+ with JRuby
3030
def to_msgpack(packer = nil)
3131
# As of 1.3.3, JRuby implementation doesn't pass an existing packer
32-
trace.spans.map { |s| SerializableSpan.new(s, @native_events_supported) }.to_msgpack(packer)
32+
trace.spans.map do |s|
33+
SerializableSpan.new(s, native_events_supported: @native_events_supported)
34+
end.to_msgpack(packer)
3335
end
3436

3537
# JSON serializer interface.
3638
# Used by older version of the transport.
3739
def to_json(*args)
38-
trace.spans.map { |s| SerializableSpan.new(s, @native_events_supported).to_hash }.to_json(*args)
40+
trace.spans.map do |s|
41+
SerializableSpan.new(s, native_events_supported: @native_events_supported).to_hash
42+
end.to_json(*args)
3943
end
4044
end
4145

@@ -46,7 +50,7 @@ class SerializableSpan
4650

4751
# @param span [Datadog::Span] the span to serialize
4852
# @param native_events_supported [Boolean] whether the agent supports span events as a top-level field
49-
def initialize(span, native_events_supported)
53+
def initialize(span, native_events_supported:)
5054
@span = span
5155
@trace_id = Tracing::Utils::TraceId.to_low_order(span.trace_id)
5256
@native_events_supported = native_events_supported

lib/datadog/tracing/transport/traces.rb

+25-8
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ class Chunker
5050
#
5151
# @param encoder [Datadog::Core::Encoding::Encoder]
5252
# @param max_size [String] maximum acceptable payload size
53-
def initialize(encoder, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
53+
def initialize(encoder, native_events_supported:, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
5454
@encoder = encoder
55+
@native_events_supported = native_events_supported
5556
@max_size = max_size
5657
end
5758

@@ -77,7 +78,7 @@ def encode_in_chunks(traces)
7778
private
7879

7980
def encode_one(trace)
80-
encoded = Encoder.encode_trace(encoder, trace)
81+
encoded = Encoder.encode_trace(encoder, trace, native_events_supported: @native_events_supported)
8182

8283
if encoded.size > max_size
8384
# This single trace is too large, we can't flush it
@@ -95,17 +96,18 @@ def encode_one(trace)
9596
module Encoder
9697
module_function
9798

98-
def encode_trace(encoder, trace)
99+
def encode_trace(encoder, trace, native_events_supported:)
99100
# Format the trace for transport
100101
TraceFormatter.format!(trace)
101102

102103
# Make the trace serializable
103-
serializable_trace = SerializableTrace.new(trace)
104-
105-
Datadog.logger.debug { "Flushing trace: #{JSON.dump(serializable_trace)}" }
104+
serializable_trace = SerializableTrace.new(trace, native_events_supported: native_events_supported)
106105

107106
# Encode the trace
108-
encoder.encode(serializable_trace)
107+
encoder.encode(serializable_trace).tap do |encoded|
108+
# Print the actual serialized trace, since the encoder can change make non-trivial changes
109+
Datadog.logger.debug { "Flushing trace: #{encoder.decode(encoded)}" }
110+
end
109111
end
110112
end
111113

@@ -126,7 +128,10 @@ def initialize(apis, default_api)
126128

127129
def send_traces(traces)
128130
encoder = current_api.encoder
129-
chunker = Datadog::Tracing::Transport::Traces::Chunker.new(encoder)
131+
chunker = Datadog::Tracing::Transport::Traces::Chunker.new(
132+
encoder,
133+
native_events_supported: native_events_supported?
134+
)
130135

131136
responses = chunker.encode_in_chunks(traces.lazy).map do |encoded_traces, trace_count|
132137
request = Request.new(EncodedParcel.new(encoded_traces, trace_count))
@@ -188,6 +193,18 @@ def change_api!(api_id)
188193
@client = HTTP::Client.new(current_api)
189194
end
190195

196+
# Queries the agent for native span events serialization support.
197+
# This changes how the serialization of span events performed.
198+
def native_events_supported?
199+
return @native_events_supported if defined?(@native_events_supported)
200+
201+
if (res = Datadog.send(:components).agent_info.fetch)
202+
@native_events_supported = res.span_events == true
203+
else
204+
false
205+
end
206+
end
207+
191208
# Raised when configured with an unknown API version
192209
class UnknownApiVersionError < StandardError
193210
attr_reader :version

sig/datadog/core/configuration/components.rbs

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ module Datadog
3636

3737
attr_reader remote: Datadog::Core::Remote::Component
3838

39+
attr_reader agent_info: Datadog::Core::Environment::AgentInfo
40+
3941
def initialize: (untyped settings) -> untyped
4042

4143
def startup!: (untyped settings) -> untyped

sig/datadog/core/encoding.rbs

+17-13
Original file line numberDiff line numberDiff line change
@@ -5,39 +5,43 @@ module Datadog
55
# Encoder interface that provides the logic to encode traces and service
66
# @abstract
77
module Encoder
8-
def content_type: () -> untyped
8+
def content_type: () -> String
99

10-
# Concatenates a list of elements previously encoded by +#encode+.
11-
def join: (untyped encoded_elements) -> untyped
10+
def encode: (untyped obj) -> String
1211

13-
# Serializes a single trace into a String suitable for network transmission.
14-
def encode: (untyped _) -> untyped
12+
def join: (Array[untyped] encoded_data) -> String
13+
14+
def decode: (String obj)-> untyped
1515
end
1616

1717
# Encoder for the JSON format
1818
module JSONEncoder
1919
extend Encoder
2020

21-
CONTENT_TYPE: "application/json"
21+
CONTENT_TYPE: String
22+
23+
def self?.content_type: () -> String
2224

23-
def self?.content_type: () -> untyped
25+
def self?.encode: (untyped obj) -> String
2426

25-
def self?.encode: (untyped obj) -> untyped
27+
def self?.join: (Array[untyped] encoded_data) -> String
2628

27-
def self?.join: (untyped encoded_data) -> ::String
29+
def self?.decode: (String obj)-> untyped
2830
end
2931

3032
# Encoder for the Msgpack format
3133
module MsgpackEncoder
3234
extend Encoder
3335

34-
CONTENT_TYPE: "application/msgpack"
36+
CONTENT_TYPE: String
37+
38+
def self?.content_type: () -> String
3539

36-
def self?.content_type: () -> untyped
40+
def self?.encode: (untyped obj) -> String
3741

38-
def self?.encode: (untyped obj) -> untyped
42+
def self?.join: (Array[untyped] encoded_data) -> String
3943

40-
def self?.join: (untyped encoded_data) -> untyped
44+
def self?.decode: (String obj)-> untyped
4145
end
4246
end
4347
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module Datadog
2+
module Core
3+
module Environment
4+
class AgentInfo
5+
attr_reader agent_settings: Configuration::AgentSettingsResolver::AgentSettings
6+
7+
def initialize: (Configuration::AgentSettingsResolver::AgentSettings agent_settings) -> void
8+
9+
def fetch: -> Remote::Transport::HTTP::Negotiation::Response?
10+
end
11+
end
12+
end
13+
end

0 commit comments

Comments
 (0)