Skip to content

Commit

Permalink
DEBUG-3535 use core transport for DI to support unix domain sockets (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
p-datadog authored Feb 25, 2025
1 parent a4eb031 commit b2274aa
Show file tree
Hide file tree
Showing 26 changed files with 1,044 additions and 276 deletions.
4 changes: 4 additions & 0 deletions Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ target :datadog do
ignore 'lib/datadog/core/workers/runtime_metrics.rb'
ignore 'lib/datadog/di/configuration/settings.rb'
ignore 'lib/datadog/di/contrib/railtie.rb'
ignore 'lib/datadog/di/transport/http/api.rb'
ignore 'lib/datadog/di/transport/http/spec.rb'
ignore 'lib/datadog/di/transport/http/diagnostics.rb'
ignore 'lib/datadog/di/transport/http/input.rb'
ignore 'lib/datadog/kit/appsec/events.rb' # disabled because of https://github.com/soutaro/steep/issues/701
ignore 'lib/datadog/kit/identity.rb' # disabled because of https://github.com/soutaro/steep/issues/701
ignore 'lib/datadog/opentelemetry.rb'
Expand Down
3 changes: 2 additions & 1 deletion lib/datadog/di.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
require_relative 'di/redactor'
require_relative 'di/remote'
require_relative 'di/serializer'
require_relative 'di/transport'
#require_relative 'di/transport'
require_relative 'di/transport/http'
require_relative 'di/utils'

module Datadog
Expand Down
4 changes: 1 addition & 3 deletions lib/datadog/di/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def initialize(settings, agent_settings, logger, code_tracker: nil, telemetry: n
@redactor = Redactor.new(settings)
@serializer = Serializer.new(settings, redactor, telemetry: telemetry)
@instrumenter = Instrumenter.new(settings, serializer, logger, code_tracker: code_tracker, telemetry: telemetry)
@transport = Transport.new(agent_settings)
@probe_notifier_worker = ProbeNotifierWorker.new(settings, transport, logger, telemetry: telemetry)
@probe_notifier_worker = ProbeNotifierWorker.new(settings, logger, agent_settings: agent_settings, telemetry: telemetry)
@probe_notification_builder = ProbeNotificationBuilder.new(settings, serializer)
@probe_manager = ProbeManager.new(settings, instrumenter, probe_notification_builder, probe_notifier_worker, logger, telemetry: telemetry)
probe_notifier_worker.start
Expand All @@ -94,7 +93,6 @@ def initialize(settings, agent_settings, logger, code_tracker: nil, telemetry: n
attr_reader :telemetry
attr_reader :code_tracker
attr_reader :instrumenter
attr_reader :transport
attr_reader :probe_notifier_worker
attr_reader :probe_notification_builder
attr_reader :probe_manager
Expand Down
24 changes: 20 additions & 4 deletions lib/datadog/di/probe_notifier_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ module DI
#
# @api private
class ProbeNotifierWorker
def initialize(settings, transport, logger, telemetry: nil)
def initialize(settings, logger, agent_settings:, telemetry: nil)
@settings = settings
@telemetry = telemetry
@status_queue = []
@snapshot_queue = []
@transport = transport
@agent_settings = agent_settings
@logger = logger
@lock = Mutex.new
@wake = Core::Semaphore.new
Expand All @@ -43,6 +43,7 @@ def initialize(settings, transport, logger, telemetry: nil)
attr_reader :settings
attr_reader :logger
attr_reader :telemetry
attr_reader :agent_settings

def start
return if @thread && @pid == Process.pid
Expand Down Expand Up @@ -154,7 +155,6 @@ def flush

private

attr_reader :transport
attr_reader :wake
attr_reader :thread

Expand All @@ -170,6 +170,22 @@ def io_in_progress?

attr_reader :last_sent

def status_transport
@status_transport ||= DI::Transport::HTTP.diagnostics(agent_settings: agent_settings)
end

def do_send_status(batch)
status_transport.send_diagnostics(batch)
end

def snapshot_transport
@snapshot_transport ||= DI::Transport::HTTP.input(agent_settings: agent_settings)
end

def do_send_snapshot(batch)
snapshot_transport.send_input(batch)
end

[
[:status, 'probe status'],
[:snapshot, 'snapshot'],
Expand Down Expand Up @@ -245,7 +261,7 @@ def set_sleep_remaining
if batch.any? # steep:ignore
begin
logger.trace { "di: sending #{batch.length} #{event_type} event(s) to agent" } # steep:ignore
transport.public_send("send_#{event_type}", batch)
send("do_send_#{event_type}", batch)
time = Core::Utils::Time.get_time
@lock.synchronize do
@last_sent = time
Expand Down
79 changes: 0 additions & 79 deletions lib/datadog/di/transport.rb

This file was deleted.

61 changes: 61 additions & 0 deletions lib/datadog/di/transport/diagnostics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

require_relative '../../core/transport/parcel'
require_relative 'http/client'

module Datadog
module DI
module Transport
module Diagnostics
class EncodedParcel
include Datadog::Core::Transport::Parcel
end

class Request < Datadog::Core::Transport::Request
end

class Transport
attr_reader :client, :apis, :default_api, :current_api_id

def initialize(apis, default_api)
@apis = apis

@client = HTTP::Client.new(current_api)
end

def current_api
@apis[HTTP::API::DIAGNOSTICS]
end

def send_diagnostics(payload)
json = JSON.dump(payload)
parcel = EncodedParcel.new(json)
request = Request.new(parcel)

response = @client.send_diagnostics_payload(request)
unless response.ok?
# TODO Datadog::Core::Transport::InternalErrorResponse
# does not have +code+ method, what is the actual API of
# these response objects?
raise Error::AgentCommunicationError, "send_diagnostics failed: #{begin
response.code
rescue
"???"
end}: #{response.payload}"
end
rescue Error::AgentCommunicationError
raise
# Datadog::Core::Transport does not perform any exception mapping,
# therefore we could have any exception here from failure to parse
# agent URI for example.
# If we ever implement retries for network errors, we should distinguish
# actual network errors from non-network errors that are raised by
# transport code.
rescue => exc
raise Error::AgentCommunicationError, "send_diagnostics failed: #{exc.class}: #{exc}"
end
end
end
end
end
end
119 changes: 119 additions & 0 deletions lib/datadog/di/transport/http.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# frozen_string_literal: true

require 'uri'

require_relative '../../core/environment/container'
require_relative '../../core/environment/ext'
require_relative '../../core/transport/ext'
require_relative '../../core/transport/http/adapters/net'
require_relative '../../core/transport/http/adapters/test'
require_relative '../../core/transport/http/adapters/unix_socket'
require_relative 'diagnostics'
require_relative 'input'
require_relative 'http/api'
require_relative '../../core/transport/http/builder'
require_relative '../../../datadog/version'

module Datadog
module DI
module Transport
# Namespace for HTTP transport components
module HTTP
module_function

# Builds a new Transport::HTTP::Client
def new(klass, &block)
Core::Transport::HTTP::Builder.new(
api_instance_class: API::Instance, &block
).to_transport(klass)
end

# Builds a new Transport::HTTP::Client with default settings
# Pass a block to override any settings.
def diagnostics(
agent_settings:,
**options
)
new(DI::Transport::Diagnostics::Transport) do |transport|
transport.adapter(agent_settings)
transport.headers default_headers

apis = API.defaults

transport.api API::DIAGNOSTICS, apis[API::DIAGNOSTICS]

# Apply any settings given by options
unless options.empty?
transport.default_api = options[:api_version] if options.key?(:api_version)
transport.headers options[:headers] if options.key?(:headers)
end

# Call block to apply any customization, if provided
yield(transport) if block_given?
end
end

# Builds a new Transport::HTTP::Client with default settings
# Pass a block to override any settings.
def input(
agent_settings:,
**options
)
new(DI::Transport::Input::Transport) do |transport|
transport.adapter(agent_settings)
transport.headers default_headers

apis = API.defaults

transport.api API::INPUT, apis[API::INPUT]

# Apply any settings given by options
unless options.empty?
transport.default_api = options[:api_version] if options.key?(:api_version)
transport.headers options[:headers] if options.key?(:headers)
end

# Call block to apply any customization, if provided
yield(transport) if block_given?
end
end

def default_headers
{
Datadog::Core::Transport::Ext::HTTP::HEADER_CLIENT_COMPUTED_TOP_LEVEL => '1',
Datadog::Core::Transport::Ext::HTTP::HEADER_META_LANG => Datadog::Core::Environment::Ext::LANG,
Datadog::Core::Transport::Ext::HTTP::HEADER_META_LANG_VERSION => Datadog::Core::Environment::Ext::LANG_VERSION,
Datadog::Core::Transport::Ext::HTTP::HEADER_META_LANG_INTERPRETER =>
Datadog::Core::Environment::Ext::LANG_INTERPRETER,
Datadog::Core::Transport::Ext::HTTP::HEADER_META_LANG_INTERPRETER_VENDOR => Core::Environment::Ext::LANG_ENGINE,
Datadog::Core::Transport::Ext::HTTP::HEADER_META_TRACER_VERSION =>
Datadog::Core::Environment::Ext::GEM_DATADOG_VERSION
}.tap do |headers|
# Add container ID, if present.
container_id = Datadog::Core::Environment::Container.container_id
headers[Datadog::Core::Transport::Ext::HTTP::HEADER_CONTAINER_ID] = container_id unless container_id.nil?
# Pretend that stats computation are already done by the client
if Datadog.configuration.appsec.standalone.enabled
headers[Datadog::Core::Transport::Ext::HTTP::HEADER_CLIENT_COMPUTED_STATS] = 'yes'
end
end
end

def default_adapter
Datadog::Core::Configuration::Ext::Agent::HTTP::ADAPTER
end

# Add adapters to registry
Datadog::Core::Transport::HTTP::Builder::REGISTRY.set(
Datadog::Core::Transport::HTTP::Adapters::Net,
Datadog::Core::Configuration::Ext::Agent::HTTP::ADAPTER
)
Datadog::Core::Transport::HTTP::Builder::REGISTRY.set(Datadog::Core::Transport::HTTP::Adapters::Test, Datadog::Core::Transport::Ext::Test::ADAPTER)
Datadog::Core::Transport::HTTP::Builder::REGISTRY.set(
Datadog::Core::Transport::HTTP::Adapters::UnixSocket,
Datadog::Core::Transport::Ext::UnixSocket::ADAPTER
)
end
end
end
end
Loading

0 comments on commit b2274aa

Please sign in to comment.