Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
Add support to Zipkin V1 (#271)
Browse files Browse the repository at this point in the history
* Add support to Zipkin V1

* Reducing test data without affecting coverage.

* PR Feedback
  • Loading branch information
Paulo Janotti authored Dec 17, 2018
1 parent de93c6b commit bb3054c
Show file tree
Hide file tree
Showing 8 changed files with 1,078 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/opentracing/opentracing-go v1.0.2 // indirect
github.com/openzipkin/zipkin-go v0.1.3
github.com/philhofer/fwd v1.0.0 // indirect
github.com/pkg/errors v0.8.0 // indirect
github.com/pkg/errors v0.8.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect
github.com/spf13/cobra v0.0.3
Expand Down
2 changes: 1 addition & 1 deletion receiver/zipkin/proto_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestConvertSpansToTraceSpans_protobuf(t *testing.T) {
hdr.Set("Content-Type", "application/x-protobuf")

// 3. Get that payload converted to OpenCensus proto spans.
reqs, err := zi.parseAndConvertToTraceSpans(protoBlob, hdr)
reqs, err := zi.v2ToTraceSpans(protoBlob, hdr)
if err != nil {
t.Fatalf("Failed to parse convert Zipkin spans in Protobuf to Trace spans: %v", err)
}
Expand Down
25 changes: 22 additions & 3 deletions receiver/zipkin/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"strings"
"sync"

"github.com/census-instrumentation/opencensus-service/translator/trace"

"go.opencensus.io/trace"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
Expand Down Expand Up @@ -110,13 +112,19 @@ func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, spanSink rece
return err
}

func (zr *ZipkinReceiver) parseAndConvertToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
var zipkinSpans []*zipkinmodel.SpanModel
// v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans.
func (zr *ZipkinReceiver) v1ToTraceSpans(blob []byte) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
return tracetranslator.ZipkinV1JSONBatchToOCProto(blob)
}

// v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans.
func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
// This flag's reference is from:
// https://github.com/openzipkin/zipkin-go/blob/3793c981d4f621c0e3eb1457acffa2c1cc591384/proto/v2/zipkin.proto#L154
debugWasSet := hdr.Get("X-B3-Flags") == "1"

var zipkinSpans []*zipkinmodel.SpanModel

// Zipkin can send protobuf via http
switch hdr.Get("Content-Type") {
// TODO: (@odeke-em) record the unique types of Content-Type uploads
Expand Down Expand Up @@ -237,7 +245,18 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_ = c.Close()
}
_ = r.Body.Close()
ereqs, err := zr.parseAndConvertToTraceSpans(slurp, r.Header)

// Now deserialize and process the spans.
asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans")

var ereqs []*agenttracepb.ExportTraceServiceRequest

if asZipkinv1 {
ereqs, err = zr.v1ToTraceSpans(slurp)
} else {
ereqs, err = zr.v2ToTraceSpans(slurp, r.Header)
}

if err != nil {
span.SetStatus(trace.Status{
Code: trace.StatusCodeInvalidArgument,
Expand Down
4 changes: 2 additions & 2 deletions receiver/zipkin/trace_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestConvertSpansToTraceSpans_json(t *testing.T) {
t.Fatalf("Failed to read sample JSON file: %v", err)
}
zi := new(ZipkinReceiver)
reqs, err := zi.parseAndConvertToTraceSpans(blob, nil)
reqs, err := zi.v2ToTraceSpans(blob, nil)
if err != nil {
t.Fatalf("Failed to parse convert Zipkin spans in JSON to Trace spans: %v", err)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestConversionRoundtrip(t *testing.T) {
}]`)

zi := &ZipkinReceiver{spanSink: new(noopSink)}
ereqs, err := zi.parseAndConvertToTraceSpans(receiverInputJSON, nil)
ereqs, err := zi.v2ToTraceSpans(receiverInputJSON, nil)
if err != nil {
t.Fatalf("Failed to parse and convert receiver JSON: %v", err)
}
Expand Down
130 changes: 130 additions & 0 deletions translator/trace/testdata/zipkin_v1_multiple_batches.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
[
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkAvailability",
"id": "0ed2e63cbe71f5a8",
"annotations": [
{
"timestamp": 1544805927448081,
"value": "sr",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
},
{
"timestamp": 1544805927460102,
"value": "ss",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
}
]
}
],
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkStock",
"id": "f9ebb6e64880612a",
"parentId": "0ed2e63cbe71f5a8",
"timestamp": 1544805927453923,
"duration": 3740,
"annotations": [
{
"timestamp": 1544805927453923,
"value": "cs",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
},
{
"timestamp": 1544805927457717,
"value": "cr",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
}
]
}
],
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkAvailability",
"id": "0ed2e63cbe71f5a8",
"timestamp": 1544805927446743,
"duration": 12956,
"annotations": [
{
"timestamp": 1544805927446743,
"value": "cs",
"endpoint": {
"ipv4": "172.31.0.2",
"port": 0,
"serviceName": "front-proxy"
}
},
{
"timestamp": 1544805927460510,
"value": "cr",
"endpoint": {
"ipv4": "172.31.0.2",
"port": 0,
"serviceName": "front-proxy"
}
}
]
}
],
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkStock",
"id": "f9ebb6e64880612a",
"parentId": "0ed2e63cbe71f5a8",
"annotations": [
{
"timestamp": 1544805927454487,
"value": "sr",
"endpoint": {
"ipv4": "172.31.0.7",
"port": 0,
"serviceName": "service2"
}
},
{
"timestamp": 1544805927457320,
"value": "ss",
"endpoint": {
"ipv4": "172.31.0.7",
"port": 0,
"serviceName": "service2"
}
}
],
"binaryAnnotations": [
{
"key": "http.url",
"value": "http://localhost:9000/trace/2"
},
{
"key": "http.status_code",
"value": "200"
},
{
"key": "success",
"value": "true"
}
]
}
]
]
122 changes: 122 additions & 0 deletions translator/trace/testdata/zipkin_v1_single_batch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkAvailability",
"id": "0ed2e63cbe71f5a8",
"annotations": [
{
"timestamp": 1544805927448081,
"value": "sr",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
},
{
"timestamp": 1544805927460102,
"value": "ss",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
}
]
},
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkStock",
"id": "f9ebb6e64880612a",
"parentId": "0ed2e63cbe71f5a8",
"timestamp": 1544805927453923,
"duration": 3740,
"annotations": [
{
"timestamp": 1544805927453923,
"value": "cs",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
},
{
"timestamp": 1544805927457717,
"value": "cr",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
}
]
},
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkAvailability",
"id": "0ed2e63cbe71f5a8",
"timestamp": 1544805927446743,
"duration": 12956,
"annotations": [
{
"timestamp": 1544805927446743,
"value": "cs",
"endpoint": {
"ipv4": "172.31.0.2",
"port": 0,
"serviceName": "front-proxy"
}
},
{
"timestamp": 1544805927460510,
"value": "cr",
"endpoint": {
"ipv4": "172.31.0.2",
"port": 0,
"serviceName": "front-proxy"
}
}
]
},
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkStock",
"id": "f9ebb6e64880612a",
"parentId": "0ed2e63cbe71f5a8",
"annotations": [
{
"timestamp": 1544805927454487,
"value": "sr",
"endpoint": {
"ipv4": "172.31.0.7",
"port": 0,
"serviceName": "service2"
}
},
{
"timestamp": 1544805927457320,
"value": "ss",
"endpoint": {
"ipv4": "172.31.0.7",
"port": 0,
"serviceName": "service2"
}
}
],
"binaryAnnotations": [
{
"key": "http.url",
"value": "http://localhost:9000/trace/2"
},
{
"key": "http.status_code",
"value": "200"
},
{
"key": "success",
"value": "true"
}
]
}
]
Loading

0 comments on commit bb3054c

Please sign in to comment.