-
Notifications
You must be signed in to change notification settings - Fork 381
/
Copy pathwriter.rb
200 lines (157 loc) · 5.91 KB
/
writer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# typed: true
require 'json'
require 'ddtrace/ext/net'
require 'datadog/core/environment/socket'
require 'ddtrace/configuration/agent_settings_resolver'
require 'ddtrace/transport/http'
require 'ddtrace/transport/io'
require 'ddtrace/encoding'
require 'ddtrace/workers'
require 'ddtrace/diagnostics/environment_logger'
require 'ddtrace/utils/only_once'
module Datadog
# Processor that sends traces and metadata to the agent
class Writer
DEPRECATION_WARN_ONLY_ONCE = Datadog::Utils::OnlyOnce.new
attr_reader \
:priority_sampler,
:transport,
:worker
def initialize(options = {})
# writer and transport parameters
@buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE)
@flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL)
transport_options = options.fetch(:transport_options, {})
transport_options[:agent_settings] = options[:agent_settings] if options.key?(:agent_settings)
# priority sampling
if options[:priority_sampler]
@priority_sampler = options[:priority_sampler]
transport_options[:api_version] ||= Transport::HTTP::API::V4
end
# transport and buffers
@transport = options.fetch(:transport) do
Transport::HTTP.default(**transport_options)
end
# handles the thread creation after an eventual fork
@mutex_after_fork = Mutex.new
@pid = nil
@traces_flushed = 0
# one worker for traces
@worker = nil
# Once stopped, this writer instance cannot be restarted.
# This allow for graceful shutdown, while preventing
# the host application from inadvertently start new
# threads during shutdown.
@stopped = false
end
def start
@mutex_after_fork.synchronize do
return false if @stopped
pid = Process.pid
return if @worker && pid == @pid
@pid = pid
start_worker
true
end
end
# spawns a worker for spans; they share the same transport which is thread-safe
def start_worker
@trace_handler = ->(items, transport) { send_spans(items, transport) }
@worker = Datadog::Workers::AsyncTransport.new(
transport: @transport,
buffer_size: @buff_size,
on_trace: @trace_handler,
interval: @flush_interval
)
@worker.start
end
# Gracefully shuts down this writer.
#
# Once stopped methods calls won't fail, but
# no internal work will be performed.
#
# It is not possible to restart a stopped writer instance.
def stop
@mutex_after_fork.synchronize { stop_worker }
end
def stop_worker
@stopped = true
return if @worker.nil?
@worker.stop
@worker = nil
true
end
private :start_worker, :stop_worker
# flush spans to the trace-agent, handles spans only
def send_spans(traces, transport)
return true if traces.empty?
# Inject hostname if configured to do so
inject_hostname!(traces) if Datadog.configuration.report_hostname
# Send traces and get responses
responses = transport.send_traces(traces)
# Tally up successful flushes
responses.reject { |x| x.internal_error? || x.server_error? }.each do |response|
@traces_flushed += response.trace_count
end
# Update priority sampler
update_priority_sampler(responses.last)
record_environment_information!(responses)
# Return if server error occurred.
!responses.find(&:server_error?)
end
# enqueue the trace for submission to the API
def write(trace, services = nil)
unless services.nil?
DEPRECATION_WARN_ONLY_ONCE.run do
Datadog.logger.warn(%(
write: Writing services has been deprecated and no longer need to be provided.
write(traces, services) can be updated to write(traces)
))
end
end
# In multiprocess environments, the main process initializes the +Writer+ instance and if
# the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
# processes will share the same +Writer+ until the first write (COW). Because of that,
# each process owns a different copy of the +@buffer+ after each write and so the
# +AsyncTransport+ will not send data to the trace agent.
#
# This check ensures that if a process doesn't own the current +Writer+, async workers
# will be initialized again (but only once for each process).
start if @worker.nil? || @pid != Process.pid
# TODO: Remove this, and have the tracer pump traces directly to runtime metrics
# instead of working through the trace writer.
# Associate root span with runtime metrics
if Datadog.configuration.runtime_metrics.enabled && !trace.empty?
Datadog.runtime_metrics.associate_with_span(trace.first)
end
worker_local = @worker
if worker_local
worker_local.enqueue_trace(trace)
elsif !@stopped
Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete')
end
end
# stats returns a dictionary of stats about the writer.
def stats
{
traces_flushed: @traces_flushed,
transport: @transport.stats
}
end
private
def inject_hostname!(traces)
traces.each do |trace|
next if trace.first.nil?
hostname = Datadog::Core::Environment::Socket.hostname
trace.first.set_tag(Ext::NET::TAG_HOSTNAME, hostname) unless hostname.nil? || hostname.empty?
end
end
def update_priority_sampler(response)
return unless response && !response.internal_error? && priority_sampler && response.service_rates
priority_sampler.update(response.service_rates)
end
def record_environment_information!(responses)
Diagnostics::EnvironmentLogger.log!(responses)
end
end
end