Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/otlpjson]: Do not emit empty batches #35827

Merged
merged 11 commits into from
Nov 8, 2024
27 changes: 27 additions & 0 deletions .chloggen/otlpjson-connector-invalid-otlp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: connector/otlpjson

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Throw error on invalid otlp payload.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35738, 35739]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
46 changes: 46 additions & 0 deletions connector/otlpjsonconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,49 @@ func TestLogsToTraces(t *testing.T) {
})
}
}

// This benchmark looks at how performance is affected when all three connectors are consuming logs (at the same time)
func BenchmarkConsumeLogs(b *testing.B) {
inputlogs := "input-log.yaml"
inputTraces := "input-trace.yaml"
inputMetrics := "input-metric.yaml"

factory := NewFactory()
// initialize log -> log connector
logsink := &consumertest.LogsSink{}
logscon, _ := factory.CreateLogsToLogs(context.Background(),
connectortest.NewNopSettings(), createDefaultConfig(), logsink)

require.NoError(b, logscon.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(b, logscon.Shutdown(context.Background()))
}()

// initialize log -> traces connector
tracesink := &consumertest.TracesSink{}
traceconn, _ := factory.CreateLogsToTraces(context.Background(),
connectortest.NewNopSettings(), createDefaultConfig(), tracesink)
require.NoError(b, traceconn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(b, traceconn.Shutdown(context.Background()))
}()

// initialize log -> metric connector
metricsink := &consumertest.MetricsSink{}
metricconn, _ := factory.CreateLogsToMetrics(context.Background(),
connectortest.NewNopSettings(), createDefaultConfig(), metricsink)
require.NoError(b, metricconn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(b, metricconn.Shutdown(context.Background()))
}()

testLogs, _ := golden.ReadLogs(filepath.Join("testdata", "logsToLogs", inputlogs))
testTraces, _ := golden.ReadLogs(filepath.Join("testdata", "logsToTraces", inputTraces))
testMetrics, _ := golden.ReadLogs(filepath.Join("testdata", "logsToMetrics", inputMetrics))

for i := 0; i < b.N; i++ {
assert.NoError(b, logscon.ConsumeLogs(context.Background(), testLogs))
assert.NoError(b, traceconn.ConsumeLogs(context.Background(), testTraces))
assert.NoError(b, metricconn.ConsumeLogs(context.Background(), testMetrics))
}
}
5 changes: 5 additions & 0 deletions connector/otlpjsonconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package otlpjsonconnector // import "github.com/open-telemetry/opentelemetry-col

import (
"context"
"regexp"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
Expand All @@ -13,6 +14,10 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/otlpjsonconnector/internal/metadata"
)

var logRegex = regexp.MustCompile(`^\{\s*"resourceLogs"\s*:\s*\[`)
var metricRegex = regexp.MustCompile(`^\{\s*"resourceMetrics"\s*:\s*\[`)
var traceRegex = regexp.MustCompile(`^\{\s*"resourceSpans"\s*:\s*\[`)

// NewFactory returns a ConnectorFactory.
func NewFactory() connector.Factory {
return connector.NewFactory(
Expand Down
29 changes: 21 additions & 8 deletions connector/otlpjsonconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,29 @@ func (c *connectorLogs) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var l plog.Logs
l, err := logsUnmarshaler.UnmarshalLogs([]byte(token.AsString()))
if err != nil {
c.logger.Error("could not extract logs from otlp json", zap.Error(err))

// Check if the "resourceLogs" key exists in the JSON data
value := token.AsString()
switch {
case logRegex.MatchString(value):
var l plog.Logs
l, err := logsUnmarshaler.UnmarshalLogs([]byte(value))
if err != nil {
c.logger.Error("could not extract logs from otlp json", zap.Error(err))
continue
}
err = c.logsConsumer.ConsumeLogs(ctx, l)
if err != nil {
c.logger.Error("could not consume logs from otlp json", zap.Error(err))
}
case metricRegex.MatchString(value), traceRegex.MatchString(value):
// If it's a metric or trace payload, simply continue
continue
default:
// If no regex matches, log the invalid payload
c.logger.Error("Invalid otlp payload")
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}
err = c.logsConsumer.ConsumeLogs(ctx, l)
if err != nil {
c.logger.Error("could not consume logs from otlp json", zap.Error(err))
}

}
}
}
Expand Down
28 changes: 20 additions & 8 deletions connector/otlpjsonconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,28 @@ func (c *connectorMetrics) ConsumeLogs(ctx context.Context, pl plog.Logs) error
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var m pmetric.Metrics
m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(token.AsString()))
if err != nil {
c.logger.Error("could extract metrics from otlp json", zap.Error(err))

value := token.AsString()
switch {
case metricRegex.MatchString(value):
var m pmetric.Metrics
m, err := metricsUnmarshaler.UnmarshalMetrics([]byte(value))
if err != nil {
c.logger.Error("could not extract metrics from otlp json", zap.Error(err))
continue
}
err = c.metricsConsumer.ConsumeMetrics(ctx, m)
if err != nil {
c.logger.Error("could not consume metrics from otlp json", zap.Error(err))
}
case logRegex.MatchString(value), traceRegex.MatchString(value):
// If it's a log or trace payload, simply continue
continue
default:
// If no regex matches, log the invalid payload
c.logger.Error("Invalid otlp payload")
}
err = c.metricsConsumer.ConsumeMetrics(ctx, m)
if err != nil {
c.logger.Error("could not consume metrics from otlp json", zap.Error(err))
}

}
}
}
Expand Down
27 changes: 19 additions & 8 deletions connector/otlpjsonconnector/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,26 @@ func (c *connectorTraces) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
for k := 0; k < logRecord.LogRecords().Len(); k++ {
lRecord := logRecord.LogRecords().At(k)
token := lRecord.Body()
var t ptrace.Traces
t, err := tracesUnmarshaler.UnmarshalTraces([]byte(token.AsString()))
if err != nil {
c.logger.Error("could extract traces from otlp json", zap.Error(err))

value := token.AsString()
switch {
case traceRegex.MatchString(value):
var t ptrace.Traces
t, err := tracesUnmarshaler.UnmarshalTraces([]byte(value))
if err != nil {
c.logger.Error("could not extract traces from otlp json", zap.Error(err))
continue
}
err = c.tracesConsumer.ConsumeTraces(ctx, t)
if err != nil {
c.logger.Error("could not consume traces from otlp json", zap.Error(err))
}
case metricRegex.MatchString(value), logRegex.MatchString(value):
// If it's a metric or log payload, continue to the next iteration
continue
}
err = c.tracesConsumer.ConsumeTraces(ctx, t)
if err != nil {
c.logger.Error("could not consume traces from otlp json", zap.Error(err))
default:
// If no regex matches, log the invalid payload
c.logger.Error("Invalid otlp payload")
}
}
}
Expand Down