Skip to content

Commit

Permalink
[connector/otlpjson]: Do not emit empty batches (open-telemetry#35827)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
The connector now does not emit empty batches for invalid otlp payload
and throws an error instead. Approach discussed here
open-telemetry#35738 (comment)

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes open-telemetry#35738 and open-telemetry#35739

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Manual Testing 

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
2 people authored and pull[bot] committed Nov 8, 2024
1 parent 1c24359 commit 4f767a5
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 24 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otlpjson-connector-invalid-otlp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: connector/otlpjson

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Throw error on invalid otlp payload.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35738, 35739]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
46 changes: 46 additions & 0 deletions connector/otlpjsonconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,49 @@ func TestLogsToTraces(t *testing.T) {
})
}
}

// This benchmark looks at how performance is affected when all three connectors are consuming logs (at the same time)
func BenchmarkConsumeLogs(b *testing.B) {
inputlogs := "input-log.yaml"
inputTraces := "input-trace.yaml"
inputMetrics := "input-metric.yaml"

factory := NewFactory()
// initialize log -> log connector
logsink := &consumertest.LogsSink{}
logscon, _ := factory.CreateLogsToLogs(context.Background(),
connectortest.NewNopSettings(), createDefaultConfig(), logsink)

require.NoError(b, logscon.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(b, logscon.Shutdown(context.Background()))
}()

// initialize log -> traces connector
tracesink := &consumertest.TracesSink{}
traceconn, _ := factory.CreateLogsToTraces(context.Background(),
connectortest.NewNopSettings(), createDefaultConfig(), tracesink)
require.NoError(b, traceconn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(b, traceconn.Shutdown(context.Background()))
}()

// initialize log -> metric connector
metricsink := &consumertest.MetricsSink{}
metricconn, _ := factory.CreateLogsToMetrics(context.Background(),
connectortest.NewNopSettings(), createDefaultConfig(), metricsink)
require.NoError(b, metricconn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(b, metricconn.Shutdown(context.Background()))
}()

testLogs, _ := golden.ReadLogs(filepath.Join("testdata", "logsToLogs", inputlogs))
testTraces, _ := golden.ReadLogs(filepath.Join("testdata", "logsToTraces", inputTraces))
testMetrics, _ := golden.ReadLogs(filepath.Join("testdata", "logsToMetrics", inputMetrics))

for i := 0; i < b.N; i++ {
assert.NoError(b, logscon.ConsumeLogs(context.Background(), testLogs))
assert.NoError(b, traceconn.ConsumeLogs(context.Background(), testTraces))
assert.NoError(b, metricconn.ConsumeLogs(context.Background(), testMetrics))
}
}
5 changes: 5 additions & 0 deletions connector/otlpjsonconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package otlpjsonconnector // import "github.com/open-telemetry/opentelemetry-col

import (
"context"
"regexp"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
Expand All @@ -13,6 +14,10 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/otlpjsonconnector/internal/metadata"
)

var logRegex = regexp.MustCompile(`^\{\s*"resourceLogs"\s*:\s*\[`)
var metricRegex = regexp.MustCompile(`^\{\s*"resourceMetrics"\s*:\s*\[`)
var traceRegex = regexp.MustCompile(`^\{\s*"resourceSpans"\s*:\s*\[`)

// NewFactory returns a ConnectorFactory.
func NewFactory() connector.Factory {
return connector.NewFactory(
Expand Down
29 changes: 21 additions & 8 deletions connector/otlpjsonconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,29 @@ func (c *connectorLogs) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var l plog.Logs
l, err := logsUnmarshaler.UnmarshalLogs([]byte(token.AsString()))
if err != nil {
c.logger.Error("could not extract logs from otlp json", zap.Error(err))

// Check if the "resourceLogs" key exists in the JSON data
value := token.AsString()
switch {
case logRegex.MatchString(value):
var l plog.Logs
l, err := logsUnmarshaler.UnmarshalLogs([]byte(value))
if err != nil {
c.logger.Error("could not extract logs from otlp json", zap.Error(err))
continue
}
err = c.logsConsumer.ConsumeLogs(ctx, l)
if err != nil {
c.logger.Error("could not consume logs from otlp json", zap.Error(err))
}
case metricRegex.MatchString(value), traceRegex.MatchString(value):
// If it's a metric or trace payload, simply continue
continue
default:
// If no regex matches, log the invalid payload
c.logger.Error("Invalid otlp payload")
}
err = c.logsConsumer.ConsumeLogs(ctx, l)
if err != nil {
c.logger.Error("could not consume logs from otlp json", zap.Error(err))
}

}
}
}
Expand Down
28 changes: 20 additions & 8 deletions connector/otlpjsonconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,28 @@ func (c *connectorMetrics) ConsumeLogs(ctx context.Context, pl plog.Logs) error
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var m pmetric.Metrics
m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(token.AsString()))
if err != nil {
c.logger.Error("could extract metrics from otlp json", zap.Error(err))

value := token.AsString()
switch {
case metricRegex.MatchString(value):
var m pmetric.Metrics
m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(value))
if err != nil {
c.logger.Error("could not extract metrics from otlp json", zap.Error(err))
continue
}
err = c.metricsConsumer.ConsumeMetrics(ctx, m)
if err != nil {
c.logger.Error("could not consume metrics from otlp json", zap.Error(err))
}
case logRegex.MatchString(value), traceRegex.MatchString(value):
// If it's a log or trace payload, simply continue
continue
default:
// If no regex matches, log the invalid payload
c.logger.Error("Invalid otlp payload")
}
err = c.metricsConsumer.ConsumeMetrics(ctx, m)
if err != nil {
c.logger.Error("could not consume metrics from otlp json", zap.Error(err))
}

}
}
}
Expand Down
27 changes: 19 additions & 8 deletions connector/otlpjsonconnector/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,26 @@ func (c *connectorTraces) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var t ptrace.Traces
t, err := tracesUnmarshaler.UnmarshalTraces([]byte(token.AsString()))
if err != nil {
c.logger.Error("could extract traces from otlp json", zap.Error(err))

value := token.AsString()
switch {
case traceRegex.MatchString(value):
var t ptrace.Traces
t, err := tracesUnmarshaler.UnmarshalTraces([]byte(value))
if err != nil {
c.logger.Error("could not extract traces from otlp json", zap.Error(err))
continue
}
err = c.tracesConsumer.ConsumeTraces(ctx, t)
if err != nil {
c.logger.Error("could not consume traces from otlp json", zap.Error(err))
}
case metricRegex.MatchString(value), logRegex.MatchString(value):
// If it's a metric or log payload, continue to the next iteration
continue
}
err = c.tracesConsumer.ConsumeTraces(ctx, t)
if err != nil {
c.logger.Error("could not consume traces from otlp json", zap.Error(err))
default:
// If no regex matches, log the invalid payload
c.logger.Error("Invalid otlp payload")
}
}
}
Expand Down

0 comments on commit 4f767a5

Please sign in to comment.