Skip to content

Commit

Permalink
feat: OTLP Replay Generator (#1542)
Browse files Browse the repository at this point in the history
* WIP First pass on otlp replay generator

* Add metrics and traces produce methods, otlp telemetry generation

* add getCurrentTime function for testing

* Add testing, factor out traces function

* Logs tests and comments, implement produce for otlp traces and metrics

* Refactor to use generic methods for visiting/updating telemetry timestamps, update timestamps for logs & metrics like traces

* Add validation test for OTLP config, update comments

* Validate that otlp_json parses some valid telemetry for the specified type, add comments, remove pointer from captured funcs, use ptrace.Span for generic trace functions, tlog.Log for generic logs and timeStamped only for metric datapoints
  • Loading branch information
shazlehu authored Mar 7, 2024
1 parent eb2562c commit 09224cd
Show file tree
Hide file tree
Showing 16 changed files with 3,872 additions and 7 deletions.
77 changes: 76 additions & 1 deletion receiver/telemetrygeneratorreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Config is the configuration for the telemetry generator receiver
Expand Down Expand Up @@ -70,18 +73,25 @@ func (g *GeneratorConfig) Validate() error {
return validateHostMetricsGeneratorConfig(g)
case generatorTypeWindowsEvents:
return validateWindowsEventsGeneratorConfig(g)

case generatorTypeOTLP:
return validateOTLPGenerator(g)
default:
return fmt.Errorf("invalid generator type: %s", g.Type)
}
}

func validateLogGeneratorConfig(g *GeneratorConfig) error {

err := pcommon.NewMap().FromRaw(g.Attributes)
if err != nil {
return fmt.Errorf("error in attributes config: %s", err)
}

err = pcommon.NewMap().FromRaw(g.ResourceAttributes)
if err != nil {
return fmt.Errorf("error in resource_attributes config: %s", err)
}

// severity and body validation
if body, ok := g.AdditionalConfig["body"]; ok {
// check if body is a valid string, if not, return an error
Expand All @@ -105,6 +115,71 @@ func validateLogGeneratorConfig(g *GeneratorConfig) error {
return nil
}

func validateOTLPGenerator(cfg *GeneratorConfig) error {

telemetryType, ok := cfg.AdditionalConfig["telemetry_type"]
if !ok {
return errors.New("telemetry_type must be set")
}

// validate the telemetry type
telemetryTypeStr, ok := telemetryType.(string)
if !ok {
return fmt.Errorf("invalid telemetry type: %v", telemetryType)
}
dataType := component.DataType(telemetryTypeStr)
switch dataType {
case component.DataTypeLogs, component.DataTypeMetrics, component.DataTypeTraces:
default:
return fmt.Errorf("invalid telemetry type: %s", telemetryType)
}

// validate the otlp json
otlpJSON, ok := cfg.AdditionalConfig["otlp_json"]
if !ok {
return errors.New("otlp_json must be set")
}

otlpJSONStr, ok := otlpJSON.(string)
if !ok {
return fmt.Errorf("otlp_json must be a string, got: %v", otlpJSON)
}

jsonBytes := []byte(otlpJSONStr)

switch dataType {
case component.DataTypeLogs:
marshaler := plog.JSONUnmarshaler{}
logs, err := marshaler.UnmarshalLogs(jsonBytes)
if err != nil {
return fmt.Errorf("error unmarshalling logs from otlp_json: %w", err)
}
if logs.LogRecordCount() == 0 {
return errors.New("no log records found in otlp_json")
}
case component.DataTypeMetrics:
marshaler := pmetric.JSONUnmarshaler{}
metrics, err := marshaler.UnmarshalMetrics(jsonBytes)
if err != nil {
return fmt.Errorf("error unmarshalling metrics from otlp_json: %w", err)
}
if metrics.DataPointCount() == 0 {
return errors.New("no metric data points found in otlp_json")
}
case component.DataTypeTraces:
marshaler := ptrace.JSONUnmarshaler{}
traces, err := marshaler.UnmarshalTraces(jsonBytes)
if err != nil {
return fmt.Errorf("error unmarshalling traces from otlp_json: %w", err)
}
if traces.SpanCount() == 0 {
return errors.New("no trace spans found in otlp_json")
}

}
return nil
}

func validateHostMetricsGeneratorConfig(_ *GeneratorConfig) error {
return nil
}
Expand Down
207 changes: 207 additions & 0 deletions receiver/telemetrygeneratorreceiver/config_test.go

Large diffs are not rendered by default.

20 changes: 18 additions & 2 deletions receiver/telemetrygeneratorreceiver/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (

// generatorTypeWindowsEvents is the generator type for windows events
generatorTypeWindowsEvents generatorType = "windows_events"

// generatorTypeOTLP is the generator type for OTLP
generatorTypeOTLP generatorType = "otlp"
)

type metricGenerator interface {
Expand All @@ -59,6 +62,9 @@ func newLogsGenerators(cfg *Config, logger *zap.Logger) []logGenerator {
generators = append(generators, newLogsGenerator(gen, logger))
case generatorTypeWindowsEvents:
generators = append(generators, newWindowsEventsGenerator(gen, logger))
case generatorTypeOTLP:
generators = append(generators, newOtlpGenerator(gen, logger))

}
}
return generators
Expand All @@ -71,11 +77,21 @@ func newMetricsGenerators(cfg *Config, logger *zap.Logger) []metricGenerator {
switch gen.Type {
case generatorTypeHostMetrics:
generators = append(generators, newHostMetricsGenerator(gen, logger))
case generatorTypeOTLP:
generators = append(generators, newOtlpGenerator(gen, logger))
}

}
return generators
}

func newTraceGenerators(_ *Config, _ *zap.Logger) []traceGenerator {
return nil
func newTraceGenerators(cfg *Config, logger *zap.Logger) []traceGenerator {
var generators []traceGenerator
for _, gen := range cfg.Generators {
switch gen.Type {
case generatorTypeOTLP:
generators = append(generators, newOtlpGenerator(gen, logger))
}
}
return generators
}
13 changes: 11 additions & 2 deletions receiver/telemetrygeneratorreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

Expand All @@ -41,7 +42,15 @@ func newMetricsReceiver(ctx context.Context, logger *zap.Logger, cfg *Config, ne
return mr
}

// TODO implement produce for metrics
// produce generates metrics from each generator and sends them to the next consumer
func (r *metricsGeneratorReceiver) produce() error {
return nil
metrics := pmetric.NewMetrics()
for _, g := range r.generators {
m := g.generateMetrics()
for i := 0; i < m.ResourceMetrics().Len(); i++ {
src := m.ResourceMetrics().At(i)
src.CopyTo(metrics.ResourceMetrics().AppendEmpty())
}
}
return r.nextConsumer.ConsumeMetrics(r.ctx, metrics)
}
Loading

0 comments on commit 09224cd

Please sign in to comment.