Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Add support to Zipkin V1 #271

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/opentracing/opentracing-go v1.0.2 // indirect
github.com/openzipkin/zipkin-go v0.1.3
github.com/philhofer/fwd v1.0.0 // indirect
github.com/pkg/errors v0.8.0 // indirect
github.com/pkg/errors v0.8.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect
github.com/spf13/cobra v0.0.3
Expand Down
2 changes: 1 addition & 1 deletion receiver/zipkin/proto_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestConvertSpansToTraceSpans_protobuf(t *testing.T) {
hdr.Set("Content-Type", "application/x-protobuf")

// 3. Get that payload converted to OpenCensus proto spans.
reqs, err := zi.parseAndConvertToTraceSpans(protoBlob, hdr)
reqs, err := zi.v2ToTraceSpans(protoBlob, hdr)
if err != nil {
t.Fatalf("Failed to parse convert Zipkin spans in Protobuf to Trace spans: %v", err)
}
Expand Down
25 changes: 22 additions & 3 deletions receiver/zipkin/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"strings"
"sync"

"github.com/census-instrumentation/opencensus-service/translator/trace"

"go.opencensus.io/trace"

zipkinmodel "github.com/openzipkin/zipkin-go/model"
Expand Down Expand Up @@ -110,13 +112,19 @@ func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, spanSink rece
return err
}

func (zr *ZipkinReceiver) parseAndConvertToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
var zipkinSpans []*zipkinmodel.SpanModel
// v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans.
func (zr *ZipkinReceiver) v1ToTraceSpans(blob []byte) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
return tracetranslator.ZipkinV1JSONBatchToOCProto(blob)
}

// v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans.
func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
// This flag's reference is from:
// https://github.com/openzipkin/zipkin-go/blob/3793c981d4f621c0e3eb1457acffa2c1cc591384/proto/v2/zipkin.proto#L154
debugWasSet := hdr.Get("X-B3-Flags") == "1"

var zipkinSpans []*zipkinmodel.SpanModel

// Zipkin can send protobuf via http
switch hdr.Get("Content-Type") {
// TODO: (@odeke-em) record the unique types of Content-Type uploads
Expand Down Expand Up @@ -237,7 +245,18 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_ = c.Close()
}
_ = r.Body.Close()
ereqs, err := zr.parseAndConvertToTraceSpans(slurp, r.Header)

// Now deserialize and process the spans.
asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans")

var ereqs []*agenttracepb.ExportTraceServiceRequest

if asZipkinv1 {
ereqs, err = zr.v1ToTraceSpans(slurp)
} else {
ereqs, err = zr.v2ToTraceSpans(slurp, r.Header)
}

if err != nil {
span.SetStatus(trace.Status{
Code: trace.StatusCodeInvalidArgument,
Expand Down
4 changes: 2 additions & 2 deletions receiver/zipkin/trace_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestConvertSpansToTraceSpans_json(t *testing.T) {
t.Fatalf("Failed to read sample JSON file: %v", err)
}
zi := new(ZipkinReceiver)
reqs, err := zi.parseAndConvertToTraceSpans(blob, nil)
reqs, err := zi.v2ToTraceSpans(blob, nil)
if err != nil {
t.Fatalf("Failed to parse convert Zipkin spans in JSON to Trace spans: %v", err)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestConversionRoundtrip(t *testing.T) {
}]`)

zi := &ZipkinReceiver{spanSink: new(noopSink)}
ereqs, err := zi.parseAndConvertToTraceSpans(receiverInputJSON, nil)
ereqs, err := zi.v2ToTraceSpans(receiverInputJSON, nil)
if err != nil {
t.Fatalf("Failed to parse and convert receiver JSON: %v", err)
}
Expand Down
130 changes: 130 additions & 0 deletions translator/trace/testdata/zipkin_v1_multiple_batches.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
[
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkAvailability",
"id": "0ed2e63cbe71f5a8",
"annotations": [
{
"timestamp": 1544805927448081,
"value": "sr",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
},
{
"timestamp": 1544805927460102,
"value": "ss",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
}
]
}
],
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkStock",
"id": "f9ebb6e64880612a",
"parentId": "0ed2e63cbe71f5a8",
"timestamp": 1544805927453923,
"duration": 3740,
"annotations": [
{
"timestamp": 1544805927453923,
"value": "cs",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
},
{
"timestamp": 1544805927457717,
"value": "cr",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
}
]
}
],
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkAvailability",
"id": "0ed2e63cbe71f5a8",
"timestamp": 1544805927446743,
"duration": 12956,
"annotations": [
{
"timestamp": 1544805927446743,
"value": "cs",
"endpoint": {
"ipv4": "172.31.0.2",
"port": 0,
"serviceName": "front-proxy"
}
},
{
"timestamp": 1544805927460510,
"value": "cr",
"endpoint": {
"ipv4": "172.31.0.2",
"port": 0,
"serviceName": "front-proxy"
}
}
]
}
],
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkStock",
"id": "f9ebb6e64880612a",
"parentId": "0ed2e63cbe71f5a8",
"annotations": [
{
"timestamp": 1544805927454487,
"value": "sr",
"endpoint": {
"ipv4": "172.31.0.7",
"port": 0,
"serviceName": "service2"
}
},
{
"timestamp": 1544805927457320,
"value": "ss",
"endpoint": {
"ipv4": "172.31.0.7",
"port": 0,
"serviceName": "service2"
}
}
],
"binaryAnnotations": [
{
"key": "http.url",
"value": "http://localhost:9000/trace/2"
},
{
"key": "http.status_code",
"value": "200"
},
{
"key": "success",
"value": "true"
}
]
}
]
]
122 changes: 122 additions & 0 deletions translator/trace/testdata/zipkin_v1_single_batch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
[
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkAvailability",
"id": "0ed2e63cbe71f5a8",
"annotations": [
{
"timestamp": 1544805927448081,
"value": "sr",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
},
{
"timestamp": 1544805927460102,
"value": "ss",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
}
]
},
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkStock",
"id": "f9ebb6e64880612a",
"parentId": "0ed2e63cbe71f5a8",
"timestamp": 1544805927453923,
"duration": 3740,
"annotations": [
{
"timestamp": 1544805927453923,
"value": "cs",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
},
{
"timestamp": 1544805927457717,
"value": "cr",
"endpoint": {
"ipv4": "172.31.0.4",
"port": 0,
"serviceName": "service1"
}
}
]
},
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkAvailability",
"id": "0ed2e63cbe71f5a8",
"timestamp": 1544805927446743,
"duration": 12956,
"annotations": [
{
"timestamp": 1544805927446743,
"value": "cs",
"endpoint": {
"ipv4": "172.31.0.2",
"port": 0,
"serviceName": "front-proxy"
}
},
{
"timestamp": 1544805927460510,
"value": "cr",
"endpoint": {
"ipv4": "172.31.0.2",
"port": 0,
"serviceName": "front-proxy"
}
}
]
},
{
"traceId": "0ed2e63cbe71f5a8",
"name": "checkStock",
"id": "f9ebb6e64880612a",
"parentId": "0ed2e63cbe71f5a8",
"annotations": [
{
"timestamp": 1544805927454487,
"value": "sr",
"endpoint": {
"ipv4": "172.31.0.7",
"port": 0,
"serviceName": "service2"
}
},
{
"timestamp": 1544805927457320,
"value": "ss",
"endpoint": {
"ipv4": "172.31.0.7",
"port": 0,
"serviceName": "service2"
}
}
],
"binaryAnnotations": [
{
"key": "http.url",
"value": "http://localhost:9000/trace/2"
},
{
"key": "http.status_code",
"value": "200"
},
{
"key": "success",
"value": "true"
}
]
}
]
Loading