diff --git a/go.mod b/go.mod index 05682818..2c133445 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/opentracing/opentracing-go v1.0.2 // indirect github.com/openzipkin/zipkin-go v0.1.3 github.com/philhofer/fwd v1.0.0 // indirect - github.com/pkg/errors v0.8.0 // indirect + github.com/pkg/errors v0.8.0 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect github.com/spf13/cobra v0.0.3 diff --git a/receiver/zipkin/proto_parse_test.go b/receiver/zipkin/proto_parse_test.go index a564ae61..169b86f1 100644 --- a/receiver/zipkin/proto_parse_test.go +++ b/receiver/zipkin/proto_parse_test.go @@ -104,7 +104,7 @@ func TestConvertSpansToTraceSpans_protobuf(t *testing.T) { hdr.Set("Content-Type", "application/x-protobuf") // 3. Get that payload converted to OpenCensus proto spans. - reqs, err := zi.parseAndConvertToTraceSpans(protoBlob, hdr) + reqs, err := zi.v2ToTraceSpans(protoBlob, hdr) if err != nil { t.Fatalf("Failed to parse convert Zipkin spans in Protobuf to Trace spans: %v", err) } diff --git a/receiver/zipkin/trace_receiver.go b/receiver/zipkin/trace_receiver.go index f0858e72..50130766 100644 --- a/receiver/zipkin/trace_receiver.go +++ b/receiver/zipkin/trace_receiver.go @@ -29,6 +29,8 @@ import ( "strings" "sync" + "github.com/census-instrumentation/opencensus-service/translator/trace" + "go.opencensus.io/trace" zipkinmodel "github.com/openzipkin/zipkin-go/model" @@ -110,13 +112,19 @@ func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, spanSink rece return err } -func (zr *ZipkinReceiver) parseAndConvertToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) { - var zipkinSpans []*zipkinmodel.SpanModel +// v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans. +func (zr *ZipkinReceiver) v1ToTraceSpans(blob []byte) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) { + return tracetranslator.ZipkinV1JSONBatchToOCProto(blob) +} +// v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans. +func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) { // This flag's reference is from: // https://github.com/openzipkin/zipkin-go/blob/3793c981d4f621c0e3eb1457acffa2c1cc591384/proto/v2/zipkin.proto#L154 debugWasSet := hdr.Get("X-B3-Flags") == "1" + var zipkinSpans []*zipkinmodel.SpanModel + // Zipkin can send protobuf via http switch hdr.Get("Content-Type") { // TODO: (@odeke-em) record the unique types of Content-Type uploads @@ -237,7 +245,18 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { _ = c.Close() } _ = r.Body.Close() - ereqs, err := zr.parseAndConvertToTraceSpans(slurp, r.Header) + + // Now deserialize and process the spans. + asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans") + + var ereqs []*agenttracepb.ExportTraceServiceRequest + + if asZipkinv1 { + ereqs, err = zr.v1ToTraceSpans(slurp) + } else { + ereqs, err = zr.v2ToTraceSpans(slurp, r.Header) + } + if err != nil { span.SetStatus(trace.Status{ Code: trace.StatusCodeInvalidArgument, diff --git a/receiver/zipkin/trace_receiver_test.go b/receiver/zipkin/trace_receiver_test.go index 32922046..60340355 100644 --- a/receiver/zipkin/trace_receiver_test.go +++ b/receiver/zipkin/trace_receiver_test.go @@ -47,7 +47,7 @@ func TestConvertSpansToTraceSpans_json(t *testing.T) { t.Fatalf("Failed to read sample JSON file: %v", err) } zi := new(ZipkinReceiver) - reqs, err := zi.parseAndConvertToTraceSpans(blob, nil) + reqs, err := zi.v2ToTraceSpans(blob, nil) if err != nil { t.Fatalf("Failed to parse convert Zipkin spans in JSON to Trace spans: %v", err) } @@ -158,7 +158,7 @@ func TestConversionRoundtrip(t *testing.T) { }]`) zi := &ZipkinReceiver{spanSink: new(noopSink)} - ereqs, err := zi.parseAndConvertToTraceSpans(receiverInputJSON, nil) + ereqs, err := zi.v2ToTraceSpans(receiverInputJSON, nil) if err != nil { t.Fatalf("Failed to parse and convert receiver JSON: %v", err) } diff --git a/translator/trace/testdata/zipkin_v1_multiple_batches.json b/translator/trace/testdata/zipkin_v1_multiple_batches.json new file mode 100644 index 00000000..de9275a7 --- /dev/null +++ b/translator/trace/testdata/zipkin_v1_multiple_batches.json @@ -0,0 +1,130 @@ +[ + [ + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkAvailability", + "id": "0ed2e63cbe71f5a8", + "annotations": [ + { + "timestamp": 1544805927448081, + "value": "sr", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, + { + "timestamp": 1544805927460102, + "value": "ss", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + } + ] + } + ], + [ + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkStock", + "id": "f9ebb6e64880612a", + "parentId": "0ed2e63cbe71f5a8", + "timestamp": 1544805927453923, + "duration": 3740, + "annotations": [ + { + "timestamp": 1544805927453923, + "value": "cs", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, + { + "timestamp": 1544805927457717, + "value": "cr", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + } + ] + } + ], + [ + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkAvailability", + "id": "0ed2e63cbe71f5a8", + "timestamp": 1544805927446743, + "duration": 12956, + "annotations": [ + { + "timestamp": 1544805927446743, + "value": "cs", + "endpoint": { + "ipv4": "172.31.0.2", + "port": 0, + "serviceName": "front-proxy" + } + }, + { + "timestamp": 1544805927460510, + "value": "cr", + "endpoint": { + "ipv4": "172.31.0.2", + "port": 0, + "serviceName": "front-proxy" + } + } + ] + } + ], + [ + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkStock", + "id": "f9ebb6e64880612a", + "parentId": "0ed2e63cbe71f5a8", + "annotations": [ + { + "timestamp": 1544805927454487, + "value": "sr", + "endpoint": { + "ipv4": "172.31.0.7", + "port": 0, + "serviceName": "service2" + } + }, + { + "timestamp": 1544805927457320, + "value": "ss", + "endpoint": { + "ipv4": "172.31.0.7", + "port": 0, + "serviceName": "service2" + } + } + ], + "binaryAnnotations": [ + { + "key": "http.url", + "value": "http://localhost:9000/trace/2" + }, + { + "key": "http.status_code", + "value": "200" + }, + { + "key": "success", + "value": "true" + } + ] + } + ] +] \ No newline at end of file diff --git a/translator/trace/testdata/zipkin_v1_single_batch.json b/translator/trace/testdata/zipkin_v1_single_batch.json new file mode 100644 index 00000000..86a451db --- /dev/null +++ b/translator/trace/testdata/zipkin_v1_single_batch.json @@ -0,0 +1,122 @@ +[ + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkAvailability", + "id": "0ed2e63cbe71f5a8", + "annotations": [ + { + "timestamp": 1544805927448081, + "value": "sr", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, + { + "timestamp": 1544805927460102, + "value": "ss", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + } + ] + }, + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkStock", + "id": "f9ebb6e64880612a", + "parentId": "0ed2e63cbe71f5a8", + "timestamp": 1544805927453923, + "duration": 3740, + "annotations": [ + { + "timestamp": 1544805927453923, + "value": "cs", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, + { + "timestamp": 1544805927457717, + "value": "cr", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + } + ] + }, + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkAvailability", + "id": "0ed2e63cbe71f5a8", + "timestamp": 1544805927446743, + "duration": 12956, + "annotations": [ + { + "timestamp": 1544805927446743, + "value": "cs", + "endpoint": { + "ipv4": "172.31.0.2", + "port": 0, + "serviceName": "front-proxy" + } + }, + { + "timestamp": 1544805927460510, + "value": "cr", + "endpoint": { + "ipv4": "172.31.0.2", + "port": 0, + "serviceName": "front-proxy" + } + } + ] + }, + { + "traceId": "0ed2e63cbe71f5a8", + "name": "checkStock", + "id": "f9ebb6e64880612a", + "parentId": "0ed2e63cbe71f5a8", + "annotations": [ + { + "timestamp": 1544805927454487, + "value": "sr", + "endpoint": { + "ipv4": "172.31.0.7", + "port": 0, + "serviceName": "service2" + } + }, + { + "timestamp": 1544805927457320, + "value": "ss", + "endpoint": { + "ipv4": "172.31.0.7", + "port": 0, + "serviceName": "service2" + } + } + ], + "binaryAnnotations": [ + { + "key": "http.url", + "value": "http://localhost:9000/trace/2" + }, + { + "key": "http.status_code", + "value": "200" + }, + { + "key": "success", + "value": "true" + } + ] + } +] \ No newline at end of file diff --git a/translator/trace/zipkinv1_to_protospan.go b/translator/trace/zipkinv1_to_protospan.go new file mode 100644 index 00000000..4ec14e50 --- /dev/null +++ b/translator/trace/zipkinv1_to_protospan.go @@ -0,0 +1,388 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracetranslator + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "math" + "strconv" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/pkg/errors" +) + +var ( + // ZipkinV1 friendly convertion errors + msgZipkinV1JSONUnmarshalError = "zipkinv1" + msgZipkinV1TraceIDError = "zipkinV1 span traceId" + msgZipkinV1SpanIDError = "zipkinV1 span id" + msgZipkinV1ParentIDError = "zipkinV1 span parentId" + // Generic hex to ID convertion errors + errHexTraceIDWrongLen = errors.New("hex traceId span has wrong length (expected 16 or 32)") + errHexTraceIDParsing = errors.New("failed to parse hex traceId") + errHexTraceIDZero = errors.New("traceId is zero") + errHexIDWrongLen = errors.New("hex Id has wrong length (expected 16)") + errHexIDParsing = errors.New("failed to parse hex Id") + errHexIDZero = errors.New("Id is zero") +) + +// Trace translation from Zipkin V1 is a bit of special case since there is no model +// defined in golang for Zipkin V1 spans and there is no need to define one here, given +// that the zipkinV1Span defined below is as defined at: +// https://zipkin.io/zipkin-api/zipkin-api.yaml +type zipkinV1Span struct { + TraceID string `json:"traceId"` + Name string `json:"name,omitempty"` + ParentID string `json:"parentId,omitempty"` + ID string `json:"id"` + Timestamp int64 `json:"timestamp"` + Duration int64 `json:"duration"` + Debug bool `json:"debug,omitempty"` + Annotations []*annotation `json:"annotations,omitempty"` + BinaryAnnotations []*binaryAnnotation `json:"binaryAnnotations,omitempty"` +} + +// endpoint structure used by zipkinV1Span. +type endpoint struct { + ServiceName string `json:"serviceName"` + IPv4 string `json:"ipv4"` + IPv6 string `json:"ipv6"` + Port int32 `json:"port"` +} + +// annotation struct used by zipkinV1Span. +type annotation struct { + Timestamp int64 `json:"timestamp"` + Value string `json:"value"` + Endpoint *endpoint `json:"endpoint"` +} + +// binaryAnnotation used by zipkinV1Span. +type binaryAnnotation struct { + Key string `json:"key"` + Value string `json:"value"` + Endpoint *endpoint `json:"endpoint"` +} + +// ZipkinV1JSONBatchToOCProto converts a JSON blob with a list of Zipkin v1 spans to OC Proto. +func ZipkinV1JSONBatchToOCProto(blob []byte) ([]*agenttracepb.ExportTraceServiceRequest, error) { + var zSpans []*zipkinV1Span + if err := json.Unmarshal(blob, &zSpans); err != nil { + return nil, errors.WithMessage(err, msgZipkinV1JSONUnmarshalError) + } + + // Service to batch maps the service name to the trace request with the corresponding node. + svcToBatch := make(map[string]*agenttracepb.ExportTraceServiceRequest) + for _, zSpan := range zSpans { + ocSpan, parsedAnnotations, err := zipkinV1ToOCSpan(zSpan) + if err != nil { + // error from internal package function, it already wraps the error to give better context. + return nil, err + } + + req := getOrCreateNodeRequest(svcToBatch, parsedAnnotations.Endpoint) + req.Spans = append(req.Spans, ocSpan) + } + + batches := make([]*agenttracepb.ExportTraceServiceRequest, 0, len(svcToBatch)) + for _, v := range svcToBatch { + batches = append(batches, v) + } + return batches, nil +} + +func zipkinV1ToOCSpan(zSpan *zipkinV1Span) (*tracepb.Span, *annotationParseResult, error) { + traceID, err := hexTraceIDToOCTraceID(zSpan.TraceID) + if err != nil { + return nil, nil, errors.WithMessage(err, msgZipkinV1TraceIDError) + } + spanID, err := hexIDToOCID(zSpan.ID) + if err != nil { + return nil, nil, errors.WithMessage(err, msgZipkinV1SpanIDError) + } + var parentID []byte + if zSpan.ParentID != "" { + id, err := hexIDToOCID(zSpan.ParentID) + if err != nil { + return nil, nil, errors.WithMessage(err, msgZipkinV1ParentIDError) + } + parentID = id + } + + parsedAnnotations := parseZipkinV1Annotations(zSpan.Annotations) + attributes := zipkinV1BinAnnotationsToOCAttributes(zSpan.BinaryAnnotations) + var startTime, endTime *timestamp.Timestamp + if zSpan.Timestamp == 0 { + startTime = parsedAnnotations.EarlyAnnotationTime + endTime = parsedAnnotations.LateAnnotationTime + } else { + startTime = epochMicrosecondsToTimestamp(zSpan.Timestamp) + endTime = epochMicrosecondsToTimestamp(zSpan.Timestamp + zSpan.Duration) + } + + ocSpan := &tracepb.Span{ + TraceId: traceID, + SpanId: spanID, + ParentSpanId: parentID, + Kind: parsedAnnotations.Kind, + TimeEvents: parsedAnnotations.TimeEvents, + StartTime: startTime, + EndTime: endTime, + Attributes: attributes, + } + + if zSpan.Name != "" { + ocSpan.Name = &tracepb.TruncatableString{Value: zSpan.Name} + } + + return ocSpan, parsedAnnotations, nil +} + +func zipkinV1BinAnnotationsToOCAttributes(binAnnotations []*binaryAnnotation) *tracepb.Span_Attributes { + if len(binAnnotations) == 0 { + return nil + } + + attributeMap := make(map[string]*tracepb.AttributeValue) + for _, binAnnotation := range binAnnotations { + pbAttrib := &tracepb.AttributeValue{} + if iValue, err := strconv.ParseInt(binAnnotation.Value, 10, 64); err == nil { + pbAttrib.Value = &tracepb.AttributeValue_IntValue{IntValue: iValue} + } else if bValue, err := strconv.ParseBool(binAnnotation.Value); err == nil { + pbAttrib.Value = &tracepb.AttributeValue_BoolValue{BoolValue: bValue} + } else { + // For now all else go to string + pbAttrib.Value = &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: binAnnotation.Value}} + } + attributeMap[binAnnotation.Key] = pbAttrib + } + + if len(attributeMap) == 0 { + return nil + } + + return &tracepb.Span_Attributes{ + AttributeMap: attributeMap, + } +} + +// annotationParseResult stores the results of examining the original annotations, +// this way multiple passes on the annotations are not needed. +type annotationParseResult struct { + Endpoint *endpoint + TimeEvents *tracepb.Span_TimeEvents + Kind tracepb.Span_SpanKind + EarlyAnnotationTime *timestamp.Timestamp + LateAnnotationTime *timestamp.Timestamp +} + +func parseZipkinV1Annotations(annotations []*annotation) *annotationParseResult { + if len(annotations) == 0 { + return nil + } + + // Unknown service name works both as a default value and a flag to indicate that a valid endpoint was found. + const unknownServiceName = "unknown-service" + + // Zipkin V1 annotations have a timestamp so they fit well with OC TimeEvent + earlyAnnotationTimestamp := int64(math.MaxInt64) + lateAnnotationTimestamp := int64(math.MinInt64) + res := &annotationParseResult{} + timeEvents := make([]*tracepb.Span_TimeEvent, 0, len(annotations)) + for _, currAnnotation := range annotations { + if currAnnotation == nil && currAnnotation.Value == "" { + continue + } + + endpointName := unknownServiceName + if currAnnotation.Endpoint != nil && currAnnotation.Endpoint.ServiceName != "" { + endpointName = currAnnotation.Endpoint.ServiceName + } + + // Specially important annotations used by zipkin v1 these are the most important ones. + switch currAnnotation.Value { + case "cs": + fallthrough + case "cr": + if res.Kind == tracepb.Span_SPAN_KIND_UNSPECIFIED { + res.Kind = tracepb.Span_CLIENT + } + fallthrough + case "ss": + fallthrough + case "sr": + if res.Kind == tracepb.Span_SPAN_KIND_UNSPECIFIED { + res.Kind = tracepb.Span_SERVER + } + if res.Endpoint == nil && endpointName != unknownServiceName { + res.Endpoint = currAnnotation.Endpoint + } + } + + ts := epochMicrosecondsToTimestamp(currAnnotation.Timestamp) + if currAnnotation.Timestamp < earlyAnnotationTimestamp { + earlyAnnotationTimestamp = currAnnotation.Timestamp + res.EarlyAnnotationTime = ts + } + if currAnnotation.Timestamp > lateAnnotationTimestamp { + lateAnnotationTimestamp = currAnnotation.Timestamp + res.LateAnnotationTime = ts + } + + timeEvent := &tracepb.Span_TimeEvent{ + Time: ts, + // More economically we could use a tracepb.Span_TimeEvent_Message, however, it will mean the loss of some information. + // Using the more expensive annotation until/if something cheaper is needed. + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + currAnnotation.Value: { + Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{ + Value: endpointName, + }, + }, + }, + }, + }, + }, + }, + } + + timeEvents = append(timeEvents, timeEvent) + } + + if len(timeEvents) > 0 { + res.TimeEvents = &tracepb.Span_TimeEvents{TimeEvent: timeEvents} + } + + if res.Endpoint == nil { + res.Endpoint = &endpoint{ + ServiceName: unknownServiceName, + } + } + + return res +} + +func hexTraceIDToOCTraceID(hex string) ([]byte, error) { + // Per info at https://zipkin.io/zipkin-api/zipkin-api.yaml it should be 16 or 32 characters + hexLen := len(hex) + if hexLen != 16 && hexLen != 32 { + return nil, errHexTraceIDWrongLen + } + + var high, low uint64 + var err error + if hexLen == 32 { + if high, err = strconv.ParseUint(hex[:16], 16, 64); err != nil { + return nil, errHexTraceIDParsing + } + } + + if low, err = strconv.ParseUint(hex[hexLen-16:], 16, 64); err != nil { + return nil, errHexTraceIDParsing + } + + if high == 0 && low == 0 { + return nil, errHexTraceIDZero + } + + traceID := make([]byte, 16) + binary.BigEndian.PutUint64(traceID[:8], high) + binary.BigEndian.PutUint64(traceID[8:], low) + return traceID, nil +} + +func hexIDToOCID(hex string) ([]byte, error) { + // Per info at https://zipkin.io/zipkin-api/zipkin-api.yaml it should be 16 characters + if len(hex) != 16 { + return nil, errHexIDWrongLen + } + + idValue, err := strconv.ParseUint(hex, 16, 64) + if err != nil { + return nil, errHexIDParsing + } + + if idValue == 0 { + return nil, errHexIDZero + } + + id := make([]byte, 8) + binary.BigEndian.PutUint64(id, idValue) + return id, nil +} + +func epochMicrosecondsToTimestamp(msecs int64) *timestamp.Timestamp { + if msecs <= 0 { + return nil + } + t := ×tamp.Timestamp{} + t.Seconds = msecs / 1e6 + t.Nanos = int32(msecs%1e6) * 1e3 + return t +} + +func getOrCreateNodeRequest(m map[string]*agenttracepb.ExportTraceServiceRequest, endpoint *endpoint) *agenttracepb.ExportTraceServiceRequest { + // this private function assumes that the caller never passes an nil endpoint + nodeKey := endpoint.string() + req := m[nodeKey] + if req != nil { + return req + } + + req = &agenttracepb.ExportTraceServiceRequest{ + Node: &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: endpoint.ServiceName}, + }, + } + + if attributeMap := endpoint.createAttributeMap(); attributeMap != nil { + req.Node.Attributes = attributeMap + } + + m[nodeKey] = req + + return req +} + +func (ep *endpoint) string() string { + return fmt.Sprintf("%s-%s-%s-%d", ep.ServiceName, ep.IPv4, ep.IPv6, ep.Port) +} + +func (ep *endpoint) createAttributeMap() map[string]string { + if ep.IPv4 == "" && ep.IPv6 == "" && ep.Port == 0 { + return nil + } + + attributeMap := make(map[string]string, 3) + if ep.IPv4 != "" { + attributeMap["ipv4"] = ep.IPv4 + } + if ep.IPv6 != "" { + attributeMap["ipv6"] = ep.IPv6 + } + if ep.Port != 0 { + attributeMap["port"] = strconv.Itoa(int(ep.Port)) + } + return attributeMap +} diff --git a/translator/trace/zipkinv1_to_protospan_test.go b/translator/trace/zipkinv1_to_protospan_test.go new file mode 100644 index 00000000..9615c2af --- /dev/null +++ b/translator/trace/zipkinv1_to_protospan_test.go @@ -0,0 +1,412 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracetranslator + +import ( + "encoding/json" + "io/ioutil" + "reflect" + "sort" + "testing" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/golang/protobuf/ptypes/timestamp" +) + +func Test_hexIDToOCID(t *testing.T) { + tests := []struct { + name string + hexStr string + want []byte + wantErr error + }{ + { + name: "empty hex string", + hexStr: "", + want: nil, + wantErr: errHexIDWrongLen, + }, + { + name: "wrong length", + hexStr: "0000", + want: nil, + wantErr: errHexIDWrongLen, + }, + { + name: "parse error", + hexStr: "000000000000000-", + want: nil, + wantErr: errHexIDParsing, + }, + { + name: "all zero", + hexStr: "0000000000000000", + want: nil, + wantErr: errHexIDZero, + }, + { + name: "happy path", + hexStr: "0706050400010203", + want: []byte{7, 6, 5, 4, 0, 1, 2, 3}, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := hexIDToOCID(tt.hexStr) + if tt.wantErr != nil && tt.wantErr != err { + t.Errorf("hexIDToOCID() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("hexIDToOCID() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_hexTraceIDToOCTraceID(t *testing.T) { + tests := []struct { + name string + hexStr string + want []byte + wantErr error + }{ + { + name: "empty hex string", + hexStr: "", + want: nil, + wantErr: errHexTraceIDWrongLen, + }, + { + name: "wrong length", + hexStr: "000000000000000010", + want: nil, + wantErr: errHexTraceIDWrongLen, + }, + { + name: "parse error", + hexStr: "000000000000000X0000000000000000", + want: nil, + wantErr: errHexTraceIDParsing, + }, + { + name: "all zero", + hexStr: "00000000000000000000000000000000", + want: nil, + wantErr: errHexTraceIDZero, + }, + { + name: "happy path", + hexStr: "00000000000000010000000000000002", + want: []byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2}, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := hexTraceIDToOCTraceID(tt.hexStr) + if tt.wantErr != nil && tt.wantErr != err { + t.Errorf("hexTraceIDToOCTraceID() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("hexTraceIDToOCTraceID() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSingleJSONZipkinV1BatchToOCProto(t *testing.T) { + blob, err := ioutil.ReadFile("./testdata/zipkin_v1_single_batch.json") + if err != nil { + t.Fatalf("failed to load test data: %v", err) + } + got, err := ZipkinV1JSONBatchToOCProto(blob) + if err != nil { + t.Fatalf("failed to translate zipkinv1 to OC proto: %v", err) + } + + want := ocBatchesFromZipkinV1 + sortTraceByNodeName(want) + sortTraceByNodeName(got) + + if !reflect.DeepEqual(got, want) { + t.Fatalf("got different data than want") + } +} + +func TestMultipleJSONZipkinV1BatchesToOCProto(t *testing.T) { + blob, err := ioutil.ReadFile("./testdata/zipkin_v1_multiple_batches.json") + if err != nil { + t.Fatalf("failed to load test data: %v", err) + } + + var batches []interface{} + if err := json.Unmarshal(blob, &batches); err != nil { + t.Fatalf("failed to load the batches: %v", err) + } + + nodeToTraceReqs := make(map[string]*agenttracepb.ExportTraceServiceRequest) + var got []*agenttracepb.ExportTraceServiceRequest + for _, batch := range batches { + jsonBatch, err := json.Marshal(batch) + if err != nil { + t.Fatalf("failed to marshal interface back to blob: %v", err) + } + + g, err := ZipkinV1JSONBatchToOCProto(jsonBatch) + if err != nil { + t.Fatalf("failed to translate zipkinv1 to OC proto: %v", err) + } + + // Coalesce the nodes otherwise they will differ due to multiple + // nodes representing same logical service + for _, tsr := range g { + key := tsr.Node.String() + if pTsr, ok := nodeToTraceReqs[key]; ok { + pTsr.Spans = append(pTsr.Spans, tsr.Spans...) + } else { + nodeToTraceReqs[key] = tsr + } + } + } + + for _, tsr := range nodeToTraceReqs { + got = append(got, tsr) + } + + want := ocBatchesFromZipkinV1 + sortTraceByNodeName(want) + sortTraceByNodeName(got) + + if !reflect.DeepEqual(got, want) { + t.Fatalf("got different data than want") + } +} + +func sortTraceByNodeName(trace []*agenttracepb.ExportTraceServiceRequest) { + sort.Slice(trace, func(i, j int) bool { + return trace[i].Node.ServiceInfo.Name < trace[j].Node.ServiceInfo.Name + }) +} + +// ocBatches has the OpenCensus proto batches used in the test. They are hard coded because +// structs like tracepb.AttributeMap cannot be ready from JSON. +var ocBatchesFromZipkinV1 = []*agenttracepb.ExportTraceServiceRequest{ + { + Node: &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: "front-proxy"}, + Attributes: map[string]string{"ipv4": "172.31.0.2"}, + }, + Spans: []*tracepb.Span{ + { + TraceId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, + SpanId: []byte{0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, + ParentSpanId: nil, + Name: &tracepb.TruncatableString{Value: "checkAvailability"}, + Kind: tracepb.Span_CLIENT, + StartTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 446743000}, + EndTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 459699000}, + TimeEvents: &tracepb.Span_TimeEvents{ + TimeEvent: []*tracepb.Span_TimeEvent{ + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 446743000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "cs": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "front-proxy"}}, + }, + }, + }, + }, + }, + }, + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 460510000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "cr": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "front-proxy"}}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Node: &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: "service1"}, + Attributes: map[string]string{"ipv4": "172.31.0.4"}, + }, + Spans: []*tracepb.Span{ + { + TraceId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, + SpanId: []byte{0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, + ParentSpanId: nil, + Name: &tracepb.TruncatableString{Value: "checkAvailability"}, + Kind: tracepb.Span_SERVER, + StartTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 448081000}, + EndTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 460102000}, + TimeEvents: &tracepb.Span_TimeEvents{ + TimeEvent: []*tracepb.Span_TimeEvent{ + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 448081000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "sr": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "service1"}}, + }, + }, + }, + }, + }, + }, + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 460102000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "ss": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "service1"}}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + TraceId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, + SpanId: []byte{0xf9, 0xeb, 0xb6, 0xe6, 0x48, 0x80, 0x61, 0x2a}, + ParentSpanId: []byte{0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, + Name: &tracepb.TruncatableString{Value: "checkStock"}, + Kind: tracepb.Span_CLIENT, + StartTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 453923000}, + EndTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 457663000}, + TimeEvents: &tracepb.Span_TimeEvents{ + TimeEvent: []*tracepb.Span_TimeEvent{ + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 453923000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "cs": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "service1"}}, + }, + }, + }, + }, + }, + }, + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 457717000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "cr": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "service1"}}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Node: &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: "service2"}, + Attributes: map[string]string{"ipv4": "172.31.0.7"}, + }, + Spans: []*tracepb.Span{ + { + TraceId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, + SpanId: []byte{0xf9, 0xeb, 0xb6, 0xe6, 0x48, 0x80, 0x61, 0x2a}, + ParentSpanId: []byte{0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, + Name: &tracepb.TruncatableString{Value: "checkStock"}, + Kind: tracepb.Span_SERVER, + StartTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 454487000}, + EndTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 457320000}, + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "http.status_code": { + Value: &tracepb.AttributeValue_IntValue{IntValue: 200}, + }, + "http.url": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "http://localhost:9000/trace/2"}}, + }, + "success": { + Value: &tracepb.AttributeValue_BoolValue{BoolValue: true}, + }, + }, + }, + TimeEvents: &tracepb.Span_TimeEvents{ + TimeEvent: []*tracepb.Span_TimeEvent{ + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 454487000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "sr": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "service2"}}, + }, + }, + }, + }, + }, + }, + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 457320000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "ss": { + Value: &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: "service2"}}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, +}