Skip to content

Commit

Permalink
Fix translation from otlp.Request to pdata representation
Browse files Browse the repository at this point in the history
The problem that this PR tries to fix, is that changes in the traces object return by the otlp.Request are not reflected in the Request.

The solution is to wrap the request proto instead of the data proto, since for the marshalers we don't have an equivalent object that shares the same problem as Request.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Apr 12, 2022
1 parent 8eb68f4 commit 52752a9
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 18 deletions.
23 changes: 19 additions & 4 deletions pdata/internal/otlp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package internal // import "go.opentelemetry.io/collector/pdata/internal"

import (
otlpcollectortrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/collector/trace/v1"
otlplogs "go.opentelemetry.io/collector/pdata/internal/data/protogen/logs/v1"
otlpmetrics "go.opentelemetry.io/collector/pdata/internal/data/protogen/metrics/v1"
otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1"
Expand All @@ -30,16 +31,30 @@ func MetricsFromOtlp(orig *otlpmetrics.MetricsData) Metrics {
return Metrics{orig: orig}
}

// TracesToOtlp internal helper to convert Traces to protobuf representation.
func TracesToOtlp(mw Traces) *otlptrace.TracesData {
// TracesToOtlp internal helper to convert Traces to otlp request representation.
func TracesToOtlp(mw Traces) *otlpcollectortrace.ExportTraceServiceRequest {
return mw.orig
}

// TracesFromOtlp internal helper to convert protobuf representation to Traces.
func TracesFromOtlp(orig *otlptrace.TracesData) Traces {
// TracesFromOtlp internal helper to convert otlp request representation to Traces.
func TracesFromOtlp(orig *otlpcollectortrace.ExportTraceServiceRequest) Traces {
return Traces{orig: orig}
}

// TracesToProto internal helper to convert Traces to protobuf representation.
func TracesToProto(mw Traces) otlptrace.TracesData {
return otlptrace.TracesData{
ResourceSpans: mw.orig.ResourceSpans,
}
}

// TracesFromProto internal helper to convert protobuf representation to Traces.
func TracesFromProto(orig otlptrace.TracesData) Traces {
return Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{
ResourceSpans: orig.ResourceSpans,
}}
}

// LogsToOtlp internal helper to convert Logs to protobuf representation.
func LogsToOtlp(l Logs) *otlplogs.LogsData {
return l.orig
Expand Down
8 changes: 5 additions & 3 deletions pdata/internal/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,27 @@
package internal // import "go.opentelemetry.io/collector/pdata/internal"

import (
otlpcollectortrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1"
)

// Traces is the top-level struct that is propagated through the traces pipeline.
// Use NewTraces to create new instance, zero-initialized instance is not valid for use.
type Traces struct {
orig *otlptrace.TracesData
// When marhsal/unmarshal unless it is in the request for otlp protocol, convert to otlptrace.TracesData.
orig *otlpcollectortrace.ExportTraceServiceRequest
}

// NewTraces creates a new Traces.
func NewTraces() Traces {
return Traces{orig: &otlptrace.TracesData{}}
return Traces{orig: &otlpcollectortrace.ExportTraceServiceRequest{}}
}

// MoveTo moves all properties from the current struct to dest
// resetting the current instance to its zero value.
func (td Traces) MoveTo(dest Traces) {
*dest.orig = *td.orig
*td.orig = otlptrace.TracesData{}
*td.orig = otlpcollectortrace.ExportTraceServiceRequest{}
}

// Clone returns a copy of Traces.
Expand Down
9 changes: 5 additions & 4 deletions pdata/ptrace/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func newJSONMarshaler() *jsonMarshaler {

func (e *jsonMarshaler) MarshalTraces(td Traces) ([]byte, error) {
buf := bytes.Buffer{}
err := e.delegate.Marshal(&buf, internal.TracesToOtlp(td))
pb := internal.TracesToProto(td)
err := e.delegate.Marshal(&buf, &pb)
return buf.Bytes(), err
}

Expand All @@ -57,10 +58,10 @@ func newJSONUnmarshaler() *jsonUnmarshaler {
}

func (d *jsonUnmarshaler) UnmarshalTraces(buf []byte) (Traces, error) {
td := &otlptrace.TracesData{}
if err := d.delegate.Unmarshal(bytes.NewReader(buf), td); err != nil {
td := otlptrace.TracesData{}
if err := d.delegate.Unmarshal(bytes.NewReader(buf), &td); err != nil {
return Traces{}, err
}
otlp.InstrumentationLibrarySpansToScope(td.ResourceSpans)
return internal.TracesFromOtlp(td), nil
return internal.TracesFromProto(td), nil
}
10 changes: 6 additions & 4 deletions pdata/ptrace/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ func newPbMarshaler() *pbMarshaler {
var _ Sizer = (*pbMarshaler)(nil)

func (e *pbMarshaler) MarshalTraces(td Traces) ([]byte, error) {
return internal.TracesToOtlp(td).Marshal()
pb := internal.TracesToProto(td)
return pb.Marshal()
}

func (e *pbMarshaler) TracesSize(td Traces) int {
return internal.TracesToOtlp(td).Size()
pb := internal.TracesToProto(td)
return pb.Size()
}

type pbUnmarshaler struct{}
Expand All @@ -53,7 +55,7 @@ func newPbUnmarshaler() *pbUnmarshaler {
}

func (d *pbUnmarshaler) UnmarshalTraces(buf []byte) (Traces, error) {
td := &otlptrace.TracesData{}
td := otlptrace.TracesData{}
err := td.Unmarshal(buf)
return internal.TracesFromOtlp(td), err
return internal.TracesFromProto(td), err
}
5 changes: 2 additions & 3 deletions pdata/ptrace/ptraceotlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"go.opentelemetry.io/collector/pdata/internal"
otlpcollectortrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/collector/trace/v1"
otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1"
"go.opentelemetry.io/collector/pdata/internal/otlp"
"go.opentelemetry.io/collector/pdata/ptrace"
)
Expand Down Expand Up @@ -108,11 +107,11 @@ func (tr Request) UnmarshalJSON(data []byte) error {
}

func (tr Request) SetTraces(td ptrace.Traces) {
tr.orig.ResourceSpans = internal.TracesToOtlp(td).ResourceSpans
*tr.orig = *internal.TracesToOtlp(td)
}

func (tr Request) Traces() ptrace.Traces {
return internal.TracesFromOtlp(&otlptrace.TracesData{ResourceSpans: tr.orig.ResourceSpans})
return internal.TracesFromOtlp(tr.orig)
}

// Client is the client API for OTLP-GRPC Traces service.
Expand Down
7 changes: 7 additions & 0 deletions pdata/ptrace/ptraceotlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ var tracesTransitionData = [][]byte{
}`),
}

func TestRequestToPData(t *testing.T) {
tr := NewRequest()
assert.Equal(t, tr.Traces().SpanCount(), 0)
tr.Traces().ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
assert.Equal(t, tr.Traces().SpanCount(), 1)
}

func TestRequestJSON(t *testing.T) {
tr := NewRequest()
assert.NoError(t, tr.UnmarshalJSON(tracesRequestJSON))
Expand Down

0 comments on commit 52752a9

Please sign in to comment.