-
Notifications
You must be signed in to change notification settings - Fork 381
/
Copy pathclient.rb
122 lines (100 loc) · 4.67 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# frozen_string_literal: true
require_relative '../../../../tracing'
require_relative '../../../metadata/ext'
require_relative '../distributed/propagation'
require_relative '../../analytics'
require_relative '../ext'
require_relative '../../ext'
require_relative '../formatting'
module Datadog
module Tracing
module Contrib
module GRPC
module DatadogInterceptor
# The DatadogInterceptor::Client implements the tracing strategy
# for gRPC client-side endpoints. This middleware component will
# inject trace context information into gRPC metadata prior to
# sending the request to the server.
class Client < Base
def trace(keywords)
formatter = GRPC::Formatting::FullMethodStringFormatter.new(keywords[:method])
options = {
type: Tracing::Metadata::Ext::HTTP::TYPE_OUTBOUND,
service: service_name, # Maintain client-side service name configuration
resource: formatter.resource_name,
on_error: on_error
}
Tracing.trace(Ext::SPAN_CLIENT, **options) do |span, trace|
annotate!(trace, span, keywords, formatter)
begin
result = yield
rescue StandardError => e
code = e.is_a?(::GRPC::BadStatus) ? e.code : ::GRPC::Core::StatusCodes::UNKNOWN
span.set_tag(Contrib::Ext::RPC::GRPC::TAG_STATUS_CODE, code)
raise
end
span.set_tag(Contrib::Ext::RPC::GRPC::TAG_STATUS_CODE, ::GRPC::Core::StatusCodes::OK)
result
end
end
private
def annotate!(trace, span, keywords, formatter)
metadata = keywords[:metadata] || {}
call = keywords[:call]
span.set_tags(metadata)
if datadog_configuration[:peer_service]
span.set_tag(
Tracing::Metadata::Ext::TAG_PEER_SERVICE,
datadog_configuration[:peer_service]
)
end
# Tag original global service name if not used
if span.service != Datadog.configuration.service
span.set_tag(Tracing::Contrib::Ext::Metadata::TAG_BASE_SERVICE, Datadog.configuration.service)
end
span.set_tag(Tracing::Metadata::Ext::TAG_KIND, Tracing::Metadata::Ext::SpanKind::TAG_CLIENT)
span.set_tag(Tracing::Metadata::Ext::TAG_COMPONENT, Ext::TAG_COMPONENT)
span.set_tag(Tracing::Metadata::Ext::TAG_OPERATION, Ext::TAG_OPERATION_CLIENT)
span.set_tag(Contrib::Ext::RPC::TAG_SYSTEM, Ext::TAG_SYSTEM)
span.set_tag(Contrib::Ext::RPC::GRPC::TAG_FULL_METHOD, formatter.grpc_full_method)
span.set_tag(Contrib::Ext::RPC::TAG_SERVICE, formatter.rpc_service)
host, _port = find_host_port(call)
span.set_tag(Tracing::Metadata::Ext::TAG_PEER_HOSTNAME, host) if host
deadline = find_deadline(call)
span.set_tag(Ext::TAG_CLIENT_DEADLINE, deadline) if deadline
# Set analytics sample rate
Contrib::Analytics.set_sample_rate(span, analytics_sample_rate) if analytics_enabled?
unless Tracing::Distributed::CircuitBreaker.should_skip_distributed_tracing?(
datadog_config: Datadog.configuration_for(self) || datadog_configuration,
trace: trace
)
GRPC.inject(trace, metadata)
end
Contrib::SpanAttributeSchema.set_peer_service!(span, Ext::PEER_SERVICE_SOURCES)
rescue StandardError => e
Datadog.logger.debug("GRPC client trace failed: #{e}")
end
def find_deadline(call)
return unless call.respond_to?(:deadline) && call.deadline.is_a?(Time)
call.deadline.utc.iso8601(3)
end
def find_host_port(call)
return unless call
peer_address = if call.respond_to?(:peer)
call.peer
else
# call is a "view" class with restricted method visibility.
# We reach into it to find our data source anyway.
call.instance_variable_get(:@wrapped).peer
end
Core::Utils.extract_host_port(peer_address)
rescue => e
Datadog.logger.debug { "Could not parse host:port from #{call}: #{e}" }
nil
end
end
end
end
end
end
end