diff --git a/model/otlpgrpc/logs.go b/model/otlpgrpc/logs.go index 173230bc9e2..17f1730713f 100644 --- a/model/otlpgrpc/logs.go +++ b/model/otlpgrpc/logs.go @@ -22,6 +22,7 @@ import ( "google.golang.org/grpc" otlpcollectorlog "go.opentelemetry.io/collector/model/internal/data/protogen/collector/logs/v1" + v1 "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1" otlplogs "go.opentelemetry.io/collector/model/internal/data/protogen/logs/v1" ipdata "go.opentelemetry.io/collector/model/internal/pdata" "go.opentelemetry.io/collector/model/pdata" @@ -133,7 +134,36 @@ func (lr LogsRequest) MarshalJSON() ([]byte, error) { // UnmarshalJSON unmarshalls LogsRequest from JSON bytes. func (lr LogsRequest) UnmarshalJSON(data []byte) error { - return jsonUnmarshaler.Unmarshal(bytes.NewReader(data), lr.orig) + if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), lr.orig); err != nil { + return err + } + lr.instrumentationLibraryLogsToScope() + return nil +} + +// instrumentationLibraryLogsToScope implements the translation of resource logs data +// following the v0.15.0 upgrade: +// receivers SHOULD check if instrumentation_library_logs is set +// and scope_logs is not set then the value in instrumentation_library_logs +// SHOULD be used instead by converting InstrumentationLibraryLogs into ScopeLogs. +// If scope_logs is set then instrumentation_library_logs SHOULD be ignored. +func (lr LogsRequest) instrumentationLibraryLogsToScope() { + for _, rl := range lr.orig.ResourceLogs { + if len(rl.ScopeLogs) == 0 { + for _, ill := range rl.InstrumentationLibraryLogs { + scopeMetrics := otlplogs.ScopeLogs{ + Scope: v1.InstrumentationScope{ + Name: ill.InstrumentationLibrary.Name, + Version: ill.InstrumentationLibrary.Version, + }, + LogRecords: ill.LogRecords, + SchemaUrl: ill.SchemaUrl, + } + rl.ScopeLogs = append(rl.ScopeLogs, &scopeMetrics) + } + } + rl.InstrumentationLibraryLogs = nil + } } func (lr LogsRequest) SetLogs(ld pdata.Logs) { diff --git a/model/otlpgrpc/logs_test.go b/model/otlpgrpc/logs_test.go index e4b598a5ef6..fa733b97edc 100644 --- a/model/otlpgrpc/logs_test.go +++ b/model/otlpgrpc/logs_test.go @@ -63,6 +63,67 @@ var logsRequestJSON = []byte(` ] }`) +var logsTransitionData = [][]byte{ + []byte(` + { + "resourceLogs": [ + { + "resource": {}, + "instrumentationLibraryLogs": [ + { + "instrumentationLibrary": {}, + "logRecords": [ + { + "body": { + "stringValue": "test_log_record" + }, + "traceId": "", + "spanId": "" + } + ] + } + ] + } + ] + }`), + []byte(` + { + "resourceLogs": [ + { + "resource": {}, + "instrumentationLibraryLogs": [ + { + "instrumentationLibrary": {}, + "logRecords": [ + { + "body": { + "stringValue": "test_log_record" + }, + "traceId": "", + "spanId": "" + } + ] + } + ], + "scopeLogs": [ + { + "scope": {}, + "logRecords": [ + { + "body": { + "stringValue": "test_log_record" + }, + "traceId": "", + "spanId": "" + } + ] + } + ] + } + ] + }`), +} + func TestLogsRequestJSON(t *testing.T) { lr := NewLogsRequest() assert.NoError(t, lr.UnmarshalJSON(logsRequestJSON)) @@ -73,6 +134,18 @@ func TestLogsRequestJSON(t *testing.T) { assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got)) } +func TestLogsRequestJSONTransition(t *testing.T) { + for _, data := range logsTransitionData { + lr := NewLogsRequest() + assert.NoError(t, lr.UnmarshalJSON(data)) + assert.Equal(t, "test_log_record", lr.Logs().ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString()) + + got, err := lr.MarshalJSON() + assert.NoError(t, err) + assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got)) + } +} + func TestLogsRequestJSON_Deprecated(t *testing.T) { lr, err := UnmarshalJSONLogsRequest(logsRequestJSON) assert.NoError(t, err) diff --git a/model/otlpgrpc/metrics.go b/model/otlpgrpc/metrics.go index 125cc96f94d..aa623ea06e1 100644 --- a/model/otlpgrpc/metrics.go +++ b/model/otlpgrpc/metrics.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc" otlpcollectormetrics "go.opentelemetry.io/collector/model/internal/data/protogen/collector/metrics/v1" + v1 "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1" otlpmetrics "go.opentelemetry.io/collector/model/internal/data/protogen/metrics/v1" ipdata "go.opentelemetry.io/collector/model/internal/pdata" "go.opentelemetry.io/collector/model/pdata" @@ -129,7 +130,36 @@ func (mr MetricsRequest) MarshalJSON() ([]byte, error) { // UnmarshalJSON unmarshalls MetricsRequest from JSON bytes. func (mr MetricsRequest) UnmarshalJSON(data []byte) error { - return jsonUnmarshaler.Unmarshal(bytes.NewReader(data), mr.orig) + if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), mr.orig); err != nil { + return err + } + mr.instrumentationLibraryMetricsToScope() + return nil +} + +// instrumentationLibraryMetricsToScope implements the translation of resource metrics data +// following the v0.15.0 upgrade: +// receivers SHOULD check if instrumentation_library_metrics is set +// and scope_metrics is not set then the value in instrumentation_library_metrics +// SHOULD be used instead by converting InstrumentationLibraryMetrics into ScopeMetrics. +// If scope_metrics is set then instrumentation_library_metrics SHOULD be ignored. +func (mr MetricsRequest) instrumentationLibraryMetricsToScope() { + for _, rm := range mr.orig.ResourceMetrics { + if len(rm.ScopeMetrics) == 0 { + for _, ilm := range rm.InstrumentationLibraryMetrics { + scopeMetrics := otlpmetrics.ScopeMetrics{ + Scope: v1.InstrumentationScope{ + Name: ilm.InstrumentationLibrary.Name, + Version: ilm.InstrumentationLibrary.Version, + }, + Metrics: ilm.Metrics, + SchemaUrl: ilm.SchemaUrl, + } + rm.ScopeMetrics = append(rm.ScopeMetrics, &scopeMetrics) + } + } + rm.InstrumentationLibraryMetrics = nil + } } func (mr MetricsRequest) SetMetrics(ld pdata.Metrics) { diff --git a/model/otlpgrpc/metrics_test.go b/model/otlpgrpc/metrics_test.go index d9b46cadb7e..69b1e66ea37 100644 --- a/model/otlpgrpc/metrics_test.go +++ b/model/otlpgrpc/metrics_test.go @@ -59,6 +59,55 @@ var metricsRequestJSON = []byte(` ] }`) +var metricsTransitionData = [][]byte{ + []byte(` + { + "resourceMetrics": [ + { + "resource": {}, + "instrumentationLibraryMetrics": [ + { + "instrumentationLibrary": {}, + "metrics": [ + { + "name": "test_metric" + } + ] + } + ] + } + ] + }`), + []byte(` + { + "resourceMetrics": [ + { + "resource": {}, + "instrumentationLibraryMetrics": [ + { + "instrumentationLibrary": {}, + "metrics": [ + { + "name": "test_metric" + } + ] + } + ], + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "test_metric" + } + ] + } + ] + } + ] + }`), +} + func TestMetricsRequestJSON(t *testing.T) { mr := NewMetricsRequest() assert.NoError(t, mr.UnmarshalJSON(metricsRequestJSON)) @@ -69,6 +118,18 @@ func TestMetricsRequestJSON(t *testing.T) { assert.Equal(t, strings.Join(strings.Fields(string(metricsRequestJSON)), ""), string(got)) } +func TestMetricsRequestJSONTransition(t *testing.T) { + for _, data := range metricsTransitionData { + mr := NewMetricsRequest() + assert.NoError(t, mr.UnmarshalJSON(data)) + assert.Equal(t, "test_metric", mr.Metrics().ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Name()) + + got, err := mr.MarshalJSON() + assert.NoError(t, err) + assert.Equal(t, strings.Join(strings.Fields(string(metricsRequestJSON)), ""), string(got)) + } +} + func TestMetricsRequestJSON_Deprecated(t *testing.T) { mr, err := UnmarshalJSONMetricsRequest(metricsRequestJSON) assert.NoError(t, err) diff --git a/model/otlpgrpc/traces.go b/model/otlpgrpc/traces.go index bff33d0f08e..5dd01b7f769 100644 --- a/model/otlpgrpc/traces.go +++ b/model/otlpgrpc/traces.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc" otlpcollectortrace "go.opentelemetry.io/collector/model/internal/data/protogen/collector/trace/v1" + v1 "go.opentelemetry.io/collector/model/internal/data/protogen/common/v1" otlptrace "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1" ipdata "go.opentelemetry.io/collector/model/internal/pdata" "go.opentelemetry.io/collector/model/pdata" @@ -129,7 +130,37 @@ func (tr TracesRequest) MarshalJSON() ([]byte, error) { // UnmarshalJSON unmarshalls TracesRequest from JSON bytes. func (tr TracesRequest) UnmarshalJSON(data []byte) error { - return jsonUnmarshaler.Unmarshal(bytes.NewReader(data), tr.orig) + err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), tr.orig) + if err != nil { + return err + } + tr.instrumentationLibraryToScope() + return nil +} + +// instrumentationLibraryToScope implements the translation of resource span data +// following the v0.15.0 upgrade: +// receivers SHOULD check if instrumentation_library_spans is set +// and scope_spans is not set then the value in instrumentation_library_spans +// SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans. +// If scope_spans is set then instrumentation_library_spans SHOULD be ignored. +func (tr TracesRequest) instrumentationLibraryToScope() { + for _, rs := range tr.orig.ResourceSpans { + if len(rs.ScopeSpans) == 0 { + for _, ils := range rs.InstrumentationLibrarySpans { + scopeSpans := otlptrace.ScopeSpans{ + Scope: v1.InstrumentationScope{ + Name: ils.InstrumentationLibrary.Name, + Version: ils.InstrumentationLibrary.Version, + }, + Spans: ils.Spans, + SchemaUrl: ils.SchemaUrl, + } + rs.ScopeSpans = append(rs.ScopeSpans, &scopeSpans) + } + } + rs.InstrumentationLibrarySpans = nil + } } func (tr TracesRequest) SetTraces(td pdata.Traces) { diff --git a/model/otlpgrpc/traces_test.go b/model/otlpgrpc/traces_test.go index 107fe5e8cac..24b8e41cc77 100644 --- a/model/otlpgrpc/traces_test.go +++ b/model/otlpgrpc/traces_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" + v1 "go.opentelemetry.io/collector/model/internal/data/protogen/trace/v1" "go.opentelemetry.io/collector/model/pdata" ) @@ -63,6 +64,67 @@ var tracesRequestJSON = []byte(` ] }`) +var tracesTransitionData = [][]byte{ + []byte(` + { + "resourceSpans": [ + { + "resource": {}, + "instrumentationLibrarySpans": [ + { + "instrumentationLibrary": {}, + "spans": [ + { + "traceId": "", + "spanId":"", + "parentSpanId":"", + "name": "test_span", + "status": {} + } + ] + } + ] + } + ] + }`), + []byte(` + { + "resourceSpans": [ + { + "resource": {}, + "instrumentationLibrarySpans": [ + { + "instrumentationLibrary": {}, + "spans": [ + { + "traceId": "", + "spanId":"", + "parentSpanId":"", + "name": "test_span", + "status": {} + } + ] + } + ], + "scopeSpans": [ + { + "scope": {}, + "spans": [ + { + "traceId": "", + "spanId":"", + "parentSpanId":"", + "name": "test_span", + "status": {} + } + ] + } + ] + } + ] + }`), +} + func TestTracesRequestJSON(t *testing.T) { tr := NewTracesRequest() assert.NoError(t, tr.UnmarshalJSON(tracesRequestJSON)) @@ -73,6 +135,18 @@ func TestTracesRequestJSON(t *testing.T) { assert.Equal(t, strings.Join(strings.Fields(string(tracesRequestJSON)), ""), string(got)) } +func TestTracesRequestJSONTransition(t *testing.T) { + for _, data := range tracesTransitionData { + tr := NewTracesRequest() + assert.NoError(t, tr.UnmarshalJSON(data)) + assert.Equal(t, "test_span", tr.Traces().ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name()) + + got, err := tr.MarshalJSON() + assert.NoError(t, err) + assert.Equal(t, strings.Join(strings.Fields(string(tracesRequestJSON)), ""), string(got)) + } +} + func TestTracesRequestJSON_Deprecated(t *testing.T) { tr, err := UnmarshalJSONTracesRequest(tracesRequestJSON) assert.NoError(t, err) @@ -116,6 +190,39 @@ func TestTracesGrpc(t *testing.T) { assert.Equal(t, NewTracesResponse(), resp) } +func TestTracesGrpcTransition(t *testing.T) { + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + RegisterTracesServer(s, &fakeTracesServer{t: t}) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, s.Serve(lis)) + }() + t.Cleanup(func() { + s.Stop() + wg.Wait() + }) + + cc, err := grpc.Dial("bufnet", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock()) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, cc.Close()) + }) + + logClient := NewTracesClient(cc) + + resp, err := logClient.Export(context.Background(), generateTracesRequestWithInstrumentationLibrary()) + assert.NoError(t, err) + assert.Equal(t, NewTracesResponse(), resp) +} + func TestTracesGrpcError(t *testing.T) { lis := bufconn.Listen(1024 * 1024) s := grpc.NewServer() @@ -170,3 +277,13 @@ func generateTracesRequest() TracesRequest { tr.SetTraces(td) return tr } + +func generateTracesRequestWithInstrumentationLibrary() TracesRequest { + tr := generateTracesRequest() + tr.orig.ResourceSpans[0].InstrumentationLibrarySpans = []*v1.InstrumentationLibrarySpans{ + { + Spans: tr.orig.ResourceSpans[0].ScopeSpans[0].Spans, + }, + } + return tr +}