Skip to content

Commit

Permalink
Add Telemetry metric support
Browse files Browse the repository at this point in the history
  • Loading branch information
GustavoCaso committed Oct 26, 2023
1 parent f46d168 commit 3ceabc9
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 6 deletions.
1 change: 1 addition & 0 deletions Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ target :ddtrace do
ignore 'lib/datadog/core/telemetry/event.rb'
ignore 'lib/datadog/core/telemetry/ext.rb'
ignore 'lib/datadog/core/telemetry/heartbeat.rb'
ignore 'lib/datadog/core/telemetry/metric_worker.rb'
ignore 'lib/datadog/core/telemetry/http/adapters/net.rb'
ignore 'lib/datadog/core/telemetry/http/env.rb'
ignore 'lib/datadog/core/telemetry/http/ext.rb'
Expand Down
42 changes: 42 additions & 0 deletions lib/datadog/core/telemetry/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
require_relative 'emitter'
require_relative 'heartbeat'
require_relative '../utils/forking'
require_relative 'metric'
require_relative 'metric_queue'
require_relative 'metric_worker'

module Datadog
module Core
Expand All @@ -27,11 +30,23 @@ def initialize(heartbeat_interval_seconds:, enabled: true)
@worker = Telemetry::Heartbeat.new(enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds) do
heartbeat!
end

Metric::Rate.interval = heartbeat_interval_seconds

@metric_queue = MetricQueue.new

@metrics_worker = Telemetry::MetricWorker.new(
enabled: @enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds
) do
flush_metrics!
end
end

def disable!
@enabled = false
@worker.enabled = false
@metrics_worker.enabled = false
end

def started!
Expand Down Expand Up @@ -74,6 +89,33 @@ def client_configuration_change!(changes)
@emitter.request('app-client-configuration-change', data: { changes: changes, origin: 'remote_config' })
end

def add_count_metric(namespace, name, value, tags)
@metrics_queue.add_metric(namespace, name, value, tags, Metric::Count)
end

def add_rate_metric(namespace, name, value, tags)
@metrics_queue.add_metric(namespace, name, value, tags, Metric::Rate)
end

def add_gauge_metric(namespace, name, value, tags)
@metrics_queue.add_metric(namespace, name, value, tags, Metric::Gauge)
end

def add_distribution_metric(namespace, name, value, tags)
@metrics_queue.add_metric(namespace, name, value, tags, Metric::Distribution)
end

def flush_metrics!
return if !@enabled || forked?

# Send metrics
@metric_queue.build_metrics_payload do |_metric_type, payload|
@emitter.request(request_type, payload: payload)
end

@metric_queue = MetricQueue.new
end

private

def heartbeat!
Expand Down
5 changes: 4 additions & 1 deletion lib/datadog/core/telemetry/emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ def initialize(http_transport: Datadog::Core::Telemetry::Http::Transport.new)
# Retrieves and emits a TelemetryRequest object based on the request type specified
# @param request_type [String] the type of telemetry request to collect data for
# @param data [Object] arbitrary object to be passed to the respective `request_type` handler
def request(request_type, data: nil)
def request(request_type, data: nil, payload: nil)
raise ArgumentError, 'Can not provide data and payload when generating temetry request' if payload && data

begin
request = Datadog::Core::Telemetry::Event.new.telemetry_request(
request_type: request_type,
seq_id: self.class.sequence.next,
data: data,
payload: payload,
).to_h
@http_transport.request(request_type: request_type.to_s, payload: request.to_json)
rescue StandardError => e
Expand Down
8 changes: 5 additions & 3 deletions lib/datadog/core/telemetry/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ def initialize
# @param request_type [String] the type of telemetry request to collect data for
# @param seq_id [Integer] the ID of the request; incremented each time a telemetry request is sent to the API
# @param data [Object] arbitrary object to be passed to the respective `request_type` handler
def telemetry_request(request_type:, seq_id:, data: nil)
def telemetry_request(request_type:, seq_id:, data: nil, payload: nil)
request_payload = payload || generate_payload(request_type, data)

Telemetry::V1::TelemetryRequest.new(
api_version: @api_version,
application: application,
host: host,
payload: payload(request_type, data),
payload: request_payload,
request_type: request_type,
runtime_id: runtime_id,
seq_id: seq_id,
Expand All @@ -40,7 +42,7 @@ def telemetry_request(request_type:, seq_id:, data: nil)

private

def payload(request_type, data)
def generate_payload(request_type, data)
case request_type
when :'app-started'
app_started
Expand Down
4 changes: 2 additions & 2 deletions lib/datadog/core/telemetry/http/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ def initialize
@path = Http::Ext::AGENT_ENDPOINT
end

def request(request_type:, payload:)
def request(request_type:, payload:, api_version: Http::Ext::API_VERSION)
env = Http::Env.new
env.path = @path
env.body = payload
env.headers = headers(request_type: request_type)
env.headers = headers(request_type: request_type, api_version: api_version)
adapter.post(env)
end

Expand Down
126 changes: 126 additions & 0 deletions lib/datadog/core/telemetry/metric.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# frozen_string_literal: true

require_relative 'event'
require_relative 'http/transport'
require_relative '../utils/sequence'
require_relative '../utils/forking'

module Datadog
module Core
module Telemetry
module Metric
# Base class for Metric
class Base
class << self
def request_type
raise NotImplementedError
end
end

attr_reader :values

def initialize(name, tags)
@name = name
@tags = tags
@values = nil
end

def update_value(value)
raise NotImplementedError
end

def metric_type
raise NotImplementedError
end

def to_h
{
tags: @tags,
type: metric_type,
common: true,
}
end
end

# GenerateMetricType
class GenerateMetricType < Base
class << self
def request_type
'generate-metrics'
end
end
end

# DistributionsMetricType
class DistributionsMetricType < Base
class << self
def request_type
'distributions'
end
end
end

# Count metric sup all the submitted values in a time interval
class Count < GenerateMetricType
def update_value(value)
if @values
@values[0][1] += value
else
@values = [[Time.now, value]]
end
end

def metric_type
'count'
end
end

# Rate metric type takes the count and divides it by the length of the time interval. This is useful if you’re
# interested in the number of hits per second.
class Rate < GenerateMetricType
class << self
attr_accessor :interval
end

def initialize(name, tags)
super(name, tags)
@count = 0.0
end

def update_value(value)
@count += value
rate = self.class.interval ? (@count / self.class.interval) : 0.0
@values = [[Time.now, rate]]
end

def metric_type
'rate'
end
end

# Gauge metric takes the last value reported during the interval.
class Gauge < GenerateMetricType
def update_value(value)
@values = [[Time.now, value]]
end

def metric_type
'gauge'
end
end

# Distribution metric are a metric type that aggregate values during the interval.
class Distribution < DistributionsMetricType
def update_value(value)
@values ||= []
@values << value
end

def metric_type
'distributions'
end
end
end
end
end
end
55 changes: 55 additions & 0 deletions lib/datadog/core/telemetry/metric_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# frozen_string_literal: true

module Datadog
module Core
module Telemetry
# Stores all the metrics information by request_type during a time interval
class MetricQueue
def initialize
@metrics = {
'generate-metrics' => {},
'distributions' => {},
}
end

def add_metric(namespace, name, value, tags, metric_klass)
namespace_space = @metrics[metric_klass.request_type][namespace] ||= {}
existing_metric = namespace_space[name]

if existing_metric
existing_metric.update_value(value)
@metrics[metric_klass.request_type][namespace][name] = existing_metric
else
new_metric = metric_klass.new(name, tags)
new_metric.update_value(value)
@metrics[metric_klass.request_type][namespace][name] = new_metric
end
end

def build_metrics_payload
@metrics.each do |metric_type, namespace|
next unless namespace

payload = {
namespace: namespace
}
namespace.each do |_namespace_key, metrics|
payload[:series] = []
series = payload[:series]

metrics.each do |metric_name, metric|
series << {
metric: metric_name,
points: metric.values,
**metric.to_h
}
end

yield metric_type, payload
end
end
end
end
end
end
end
35 changes: 35 additions & 0 deletions lib/datadog/core/telemetry/metric_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

require_relative '../worker'
require_relative '../workers/polling'

module Datadog
module Core
module Telemetry
# Periodically (every DEFAULT_INTERVAL_SECONDS) sends metrics to the telemetry API.
class MetricWorker < Core::Worker
include Core::Workers::Polling

def initialize(heartbeat_interval_seconds:, enabled: true, &block)
# Workers::Polling settings
self.enabled = enabled
# Workers::IntervalLoop settings
self.loop_base_interval = heartbeat_interval_seconds
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP
super(&block)
start
end

def loop_wait_before_first_iteration?
true
end

private

def start
perform
end
end
end
end
end
3 changes: 3 additions & 0 deletions sig/datadog/core/telemetry/heartbeat.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ module Datadog
module Telemetry
class Heartbeat < Core::Worker
include Core::Workers::Polling
include Core::Workers::IntervalLoop
include Core::Workers::Async::Thread
prepend Core::Workers::Polling::PrependedMethods

DEFAULT_INTERVAL_SECONDS: 60

Expand Down
Loading

0 comments on commit 3ceabc9

Please sign in to comment.