Skip to content

Commit

Permalink
Fix translation from otlp.Request to pdata representation (open-telem…
Browse files Browse the repository at this point in the history
…etry#5197)

The problem that this PR tries to fix, is that changes in the traces object return by the otlp.Request are not reflected in the Request.

The solution is to wrap the request proto instead of the data proto, since for the marshalers we don't have an equivalent object that shares the same problem as Request.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored and Nicholaswang committed Jun 7, 2022
1 parent 389715e commit 34373b0
Show file tree
Hide file tree
Showing 20 changed files with 175 additions and 119 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### 💡 Enhancements 💡

### 🧰 Bug fixes 🧰
- Fix translation from otlp.Request to pdata representation, changes to the returned pdata not all reflected to the otlp.Request (#5197)

## v0.49.0 Beta

Expand Down
31 changes: 28 additions & 3 deletions pdata/internal/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,50 @@
package internal // import "go.opentelemetry.io/collector/pdata/internal"

import (
otlpcollectorlog "go.opentelemetry.io/collector/pdata/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/pdata/internal/data/protogen/logs/v1"
)

// LogsToOtlp internal helper to convert Logs to otlp request representation.
func LogsToOtlp(mw Logs) *otlpcollectorlog.ExportLogsServiceRequest {
return mw.orig
}

// LogsFromOtlp internal helper to convert otlp request representation to Logs.
func LogsFromOtlp(orig *otlpcollectorlog.ExportLogsServiceRequest) Logs {
return Logs{orig: orig}
}

// LogsToProto internal helper to convert Logs to protobuf representation.
func LogsToProto(l Logs) otlplogs.LogsData {
return otlplogs.LogsData{
ResourceLogs: l.orig.ResourceLogs,
}
}

// LogsFromProto internal helper to convert protobuf representation to Logs.
func LogsFromProto(orig otlplogs.LogsData) Logs {
return Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
ResourceLogs: orig.ResourceLogs,
}}
}

// Logs is the top-level struct that is propagated through the logs pipeline.
// Use NewLogs to create new instance, zero-initialized instance is not valid for use.
type Logs struct {
orig *otlplogs.LogsData
orig *otlpcollectorlog.ExportLogsServiceRequest
}

// NewLogs creates a new Logs.
func NewLogs() Logs {
return Logs{orig: &otlplogs.LogsData{}}
return Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
}

// MoveTo moves all properties from the current struct to dest
// resetting the current instance to its zero value.
func (ld Logs) MoveTo(dest Logs) {
*dest.orig = *ld.orig
*ld.orig = otlplogs.LogsData{}
*ld.orig = otlpcollectorlog.ExportLogsServiceRequest{}
}

// Clone returns a copy of Logs.
Expand Down
14 changes: 8 additions & 6 deletions pdata/internal/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
goproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"

otlpcollectorlog "go.opentelemetry.io/collector/pdata/internal/data/protogen/collector/logs/v1"
otlplogs "go.opentelemetry.io/collector/pdata/internal/data/protogen/logs/v1"
)

Expand Down Expand Up @@ -51,17 +52,17 @@ func TestLogRecordCount(t *testing.T) {

func TestLogRecordCountWithEmpty(t *testing.T) {
assert.Zero(t, NewLogs().LogRecordCount())
assert.Zero(t, Logs{orig: &otlplogs.LogsData{
assert.Zero(t, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
ResourceLogs: []*otlplogs.ResourceLogs{{}},
}}.LogRecordCount())
assert.Zero(t, Logs{orig: &otlplogs.LogsData{
assert.Zero(t, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
ResourceLogs: []*otlplogs.ResourceLogs{
{
ScopeLogs: []*otlplogs.ScopeLogs{{}},
},
},
}}.LogRecordCount())
assert.Equal(t, 1, Logs{orig: &otlplogs.LogsData{
assert.Equal(t, 1, Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{
ResourceLogs: []*otlplogs.ResourceLogs{
{
ScopeLogs: []*otlplogs.ScopeLogs{
Expand All @@ -74,10 +75,11 @@ func TestLogRecordCountWithEmpty(t *testing.T) {
}}.LogRecordCount())
}

func TestToFromLogProto(t *testing.T) {
logs := LogsFromOtlp(&otlplogs.LogsData{})
func TestToFromLogOtlp(t *testing.T) {
otlp := &otlpcollectorlog.ExportLogsServiceRequest{}
logs := LogsFromOtlp(otlp)
assert.EqualValues(t, NewLogs(), logs)
assert.EqualValues(t, &otlplogs.LogsData{}, logs.orig)
assert.EqualValues(t, otlp, LogsToOtlp(logs))
}

func TestResourceLogsWireCompatibility(t *testing.T) {
Expand Down
31 changes: 28 additions & 3 deletions pdata/internal/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,43 @@
package internal // import "go.opentelemetry.io/collector/pdata/internal"

import (
otlpcollectormetrics "go.opentelemetry.io/collector/pdata/internal/data/protogen/collector/metrics/v1"
otlpmetrics "go.opentelemetry.io/collector/pdata/internal/data/protogen/metrics/v1"
)

// MetricsToOtlp internal helper to convert Metrics to otlp request representation.
func MetricsToOtlp(mw Metrics) *otlpcollectormetrics.ExportMetricsServiceRequest {
return mw.orig
}

// MetricsFromOtlp internal helper to convert otlp request representation to Metrics.
func MetricsFromOtlp(orig *otlpcollectormetrics.ExportMetricsServiceRequest) Metrics {
return Metrics{orig: orig}
}

// MetricsToProto internal helper to convert Metrics to protobuf representation.
func MetricsToProto(l Metrics) otlpmetrics.MetricsData {
return otlpmetrics.MetricsData{
ResourceMetrics: l.orig.ResourceMetrics,
}
}

// MetricsFromProto internal helper to convert protobuf representation to Metrics.
func MetricsFromProto(orig otlpmetrics.MetricsData) Metrics {
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: orig.ResourceMetrics,
}}
}

// Metrics is the top-level struct that is propagated through the metrics pipeline.
// Use NewMetrics to create new instance, zero-initialized instance is not valid for use.
type Metrics struct {
orig *otlpmetrics.MetricsData
orig *otlpcollectormetrics.ExportMetricsServiceRequest
}

// NewMetrics creates a new Metrics.
func NewMetrics() Metrics {
return Metrics{orig: &otlpmetrics.MetricsData{}}
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{}}
}

// Clone returns a copy of MetricData.
Expand All @@ -40,7 +65,7 @@ func (md Metrics) Clone() Metrics {
// resetting the current instance to its zero value.
func (md Metrics) MoveTo(dest Metrics) {
*dest.orig = *md.orig
*md.orig = otlpmetrics.MetricsData{}
*md.orig = otlpcollectormetrics.ExportMetricsServiceRequest{}
}

// ResourceMetrics returns the ResourceMetricsSlice associated with this Metrics.
Expand Down
29 changes: 15 additions & 14 deletions pdata/internal/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
goproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"

otlpcollectormetrics "go.opentelemetry.io/collector/pdata/internal/data/protogen/collector/metrics/v1"
otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1"
otlpmetrics "go.opentelemetry.io/collector/pdata/internal/data/protogen/metrics/v1"
otlpresource "go.opentelemetry.io/collector/pdata/internal/data/protogen/resource/v1"
Expand Down Expand Up @@ -226,7 +227,7 @@ func TestMetricsMoveTo(t *testing.T) {
}

func TestOtlpToInternalReadOnly(t *testing.T) {
md := Metrics{orig: &otlpmetrics.MetricsData{
md := Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand Down Expand Up @@ -314,7 +315,7 @@ func TestOtlpToInternalReadOnly(t *testing.T) {
}

func TestOtlpToFromInternalReadOnly(t *testing.T) {
md := MetricsFromOtlp(&otlpmetrics.MetricsData{
md := MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand Down Expand Up @@ -346,7 +347,7 @@ func TestOtlpToFromInternalReadOnly(t *testing.T) {
func TestOtlpToFromInternalGaugeMutating(t *testing.T) {
newAttributes := NewMapFromRaw(map[string]interface{}{"k": "v"})

md := MetricsFromOtlp(&otlpmetrics.MetricsData{
md := MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand Down Expand Up @@ -429,7 +430,7 @@ func TestOtlpToFromInternalGaugeMutating(t *testing.T) {
func TestOtlpToFromInternalSumMutating(t *testing.T) {
newAttributes := NewMapFromRaw(map[string]interface{}{"k": "v"})

md := MetricsFromOtlp(&otlpmetrics.MetricsData{
md := MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand Down Expand Up @@ -514,7 +515,7 @@ func TestOtlpToFromInternalSumMutating(t *testing.T) {
func TestOtlpToFromInternalHistogramMutating(t *testing.T) {
newAttributes := NewMapFromRaw(map[string]interface{}{"k": "v"})

md := MetricsFromOtlp(&otlpmetrics.MetricsData{
md := MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand Down Expand Up @@ -598,7 +599,7 @@ func TestOtlpToFromInternalHistogramMutating(t *testing.T) {
func TestOtlpToFromInternalExponentialHistogramMutating(t *testing.T) {
newAttributes := NewMapFromRaw(map[string]interface{}{"k": "v"})

md := MetricsFromOtlp(&otlpmetrics.MetricsData{
md := MetricsFromOtlp(&otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand Down Expand Up @@ -707,7 +708,7 @@ func BenchmarkMetricsClone(b *testing.B) {
}

func BenchmarkOtlpToFromInternal_PassThrough(b *testing.B) {
req := &otlpmetrics.MetricsData{
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand All @@ -732,7 +733,7 @@ func BenchmarkOtlpToFromInternal_PassThrough(b *testing.B) {
}

func BenchmarkOtlpToFromInternal_Gauge_MutateOneLabel(b *testing.B) {
req := &otlpmetrics.MetricsData{
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand All @@ -758,7 +759,7 @@ func BenchmarkOtlpToFromInternal_Gauge_MutateOneLabel(b *testing.B) {
}

func BenchmarkOtlpToFromInternal_Sum_MutateOneLabel(b *testing.B) {
req := &otlpmetrics.MetricsData{
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand All @@ -784,7 +785,7 @@ func BenchmarkOtlpToFromInternal_Sum_MutateOneLabel(b *testing.B) {
}

func BenchmarkOtlpToFromInternal_HistogramPoints_MutateOneLabel(b *testing.B) {
req := &otlpmetrics.MetricsData{
req := &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
Resource: generateTestProtoResource(),
Expand Down Expand Up @@ -947,13 +948,13 @@ func generateTestProtoDoubleHistogramMetric() *otlpmetrics.Metric {
}

func generateMetricsEmptyResource() Metrics {
return Metrics{orig: &otlpmetrics.MetricsData{
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{{}},
}}
}

func generateMetricsEmptyInstrumentation() Metrics {
return Metrics{orig: &otlpmetrics.MetricsData{
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
ScopeMetrics: []*otlpmetrics.ScopeMetrics{{}},
Expand All @@ -963,7 +964,7 @@ func generateMetricsEmptyInstrumentation() Metrics {
}

func generateMetricsEmptyMetrics() Metrics {
return Metrics{orig: &otlpmetrics.MetricsData{
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
ScopeMetrics: []*otlpmetrics.ScopeMetrics{
Expand All @@ -977,7 +978,7 @@ func generateMetricsEmptyMetrics() Metrics {
}

func generateMetricsEmptyDataPoints() Metrics {
return Metrics{orig: &otlpmetrics.MetricsData{
return Metrics{orig: &otlpcollectormetrics.ExportMetricsServiceRequest{
ResourceMetrics: []*otlpmetrics.ResourceMetrics{
{
ScopeMetrics: []*otlpmetrics.ScopeMetrics{
Expand Down
51 changes: 0 additions & 51 deletions pdata/internal/otlp_wrapper.go

This file was deleted.

32 changes: 29 additions & 3 deletions pdata/internal/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,51 @@
package internal // import "go.opentelemetry.io/collector/pdata/internal"

import (
otlpcollectortrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1"
)

// TracesToOtlp internal helper to convert Traces to otlp request representation.
func TracesToOtlp(mw Traces) *otlpcollectortrace.ExportTraceServiceRequest {
return mw.orig
}

// TracesFromOtlp internal helper to convert otlp request representation to Traces.
func TracesFromOtlp(orig *otlpcollectortrace.ExportTraceServiceRequest) Traces {
return Traces{orig: orig}
}

// TracesToProto internal helper to convert Traces to protobuf representation.
func TracesToProto(mw Traces) otlptrace.TracesData {
return otlptrace.TracesData{
ResourceSpans: mw.orig.ResourceSpans,
}
}

// TracesFromProto internal helper to convert protobuf representation to Traces.
func TracesFromProto(orig otlptrace.TracesData) Traces {
return Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: orig.ResourceSpans,
}}
}

// Traces is the top-level struct that is propagated through the traces pipeline.
// Use NewTraces to create new instance, zero-initialized instance is not valid for use.
type Traces struct {
orig *otlptrace.TracesData
// When marhsal/unmarshal unless it is in the request for otlp protocol, convert to otlptrace.TracesData.
orig *otlpcollectortrace.ExportTraceServiceRequest
}

// NewTraces creates a new Traces.
func NewTraces() Traces {
return Traces{orig: &otlptrace.TracesData{}}
return Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
}

// MoveTo moves all properties from the current struct to dest
// resetting the current instance to its zero value.
func (td Traces) MoveTo(dest Traces) {
*dest.orig = *td.orig
*td.orig = otlptrace.TracesData{}
*td.orig = otlpcollectortrace.ExportTraceServiceRequest{}
}

// Clone returns a copy of Traces.
Expand Down
Loading

0 comments on commit 34373b0

Please sign in to comment.